[jboss-cvs] JBoss Messaging SVN: r5241 - in trunk: src/main/org/jboss/messaging/core/client/impl and 15 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Nov 3 08:01:52 EST 2008
Author: timfox
Date: 2008-11-03 08:01:51 -0500 (Mon, 03 Nov 2008)
New Revision: 5241
Modified:
trunk/src/config/jbm-configuration.xml
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/messaging/core/config/Configuration.java
trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
Log:
More on flow control
Modified: trunk/src/config/jbm-configuration.xml
===================================================================
--- trunk/src/config/jbm-configuration.xml 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/config/jbm-configuration.xml 2008-11-03 13:01:51 UTC (rev 5241)
@@ -17,11 +17,6 @@
<!-- true to expose JBoss Messaging resources through JMX -->
<jmx-management-enabled>true</jmx-management-enabled>
- <!-- call timeout in milliseconds -->
- <call-timeout>30000</call-timeout>
-
- <packet-confirmation-batch-size>10000</packet-confirmation-batch-size>
-
<connection-scan-period>10000</connection-scan-period>
<!-- Example interceptors
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionFactoryImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -11,7 +11,6 @@
*/
package org.jboss.messaging.core.client.impl;
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
import java.util.HashSet;
@@ -73,6 +72,9 @@
public static final boolean DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND = false;
public static final boolean DEFAULT_AUTO_GROUP_ID = false;
+
+ public static final long DEFAULT_CALL_TIMEOUT = 30000;
+
public static final int DEFAULT_MAX_CONNECTIONS = 8;
@@ -158,8 +160,7 @@
transportParams,
pingPeriod,
callTimeout,
- maxConnections,
- sendWindowSize);
+ maxConnections);
if (backupConfig != null)
{
backupConnectorFactory = instantiateConnectorFactory(backupConfig.getFactoryClassName());
@@ -170,8 +171,7 @@
backupTransportParams,
pingPeriod,
callTimeout,
- maxConnections,
- sendWindowSize);
+ maxConnections);
}
else
{
@@ -192,6 +192,7 @@
this.blockOnPersistentSend = blockOnPersistentSend;
this.autoGroupId = autoGroupId;
this.maxConnections = maxConnections;
+ log.info("Creating csf with send window size " + this.sendWindowSize);
}
public ClientSessionFactoryImpl(final TransportConfiguration connectorConfig,
@@ -535,7 +536,7 @@
backupConnection = backupConnectionManager.getConnection();
}
- channel1 = connection.getChannel(1, -1);
+ channel1 = connection.getChannel(1, -1, false);
//Lock it - this must be done while the failoverLock is held
channel1.getLock().lock();
@@ -558,7 +559,8 @@
password,
xa,
autoCommitSends,
- autoCommitAcks);
+ autoCommitAcks,
+ sendWindowSize);
Packet pResponse = channel1.sendBlocking(request);
@@ -576,14 +578,12 @@
retry = true;
}
else
- {
-
+ {
CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
- int packetConfirmationBatchSize = response.getPacketConfirmationBatchSize();
-
Channel sessionChannel = connection.getChannel(sessionChannelID,
- packetConfirmationBatchSize);
+ sendWindowSize,
+ sendWindowSize != -1);
ClientSessionInternal session = new ClientSessionImpl(this,
name,
@@ -680,7 +680,7 @@
for (RemotingConnection conn: conns)
{
- Channel channel1 = conn.getChannel(1, -1);
+ Channel channel1 = conn.getChannel(1, -1, false);
channel1.getLock().lock();
}
@@ -692,7 +692,7 @@
for (RemotingConnection conn: conns)
{
- Channel channel1 = conn.getChannel(1, -1);
+ Channel channel1 = conn.getChannel(1, -1, false);
channel1.getLock().unlock();
}
@@ -704,7 +704,7 @@
for (RemotingConnection conn: conns)
{
- Channel channel1 = conn.getChannel(1, -1);
+ Channel channel1 = conn.getChannel(1, -1, false);
channel1.returnBlocking();
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -697,7 +697,7 @@
Packet request = new ReattachSessionMessage(name, channel.getLastReceivedCommandID());
- Channel channel1 = backupConnection.getChannel(1, -1);
+ Channel channel1 = backupConnection.getChannel(1, -1, false);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
Modified: trunk/src/main/org/jboss/messaging/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/config/Configuration.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -68,14 +68,6 @@
void setJMXManagementEnabled(boolean enabled);
- long getCallTimeout();
-
- void setCallTimeout(long timeout);
-
- int getPacketConfirmationBatchSize();
-
- void setPacketConfirmationBatchSize(int size);
-
List<String> getInterceptorClassNames();
void setInterceptorClassNames(List<String> interceptors);
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/ConfigurationImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -45,10 +45,6 @@
public static final boolean DEFAULT_JMX_MANAGEMENT_ENABLED = true;
- public static final int DEFAULT_CALL_TIMEOUT = 30000;
-
- public static final int DEFAULT_PACKET_CONFIRMATION_BATCH_SIZE = 1000;
-
public static final long DEFAULT_CONNECTION_SCAN_PERIOD = 1000;
public static final String DEFAULT_BINDINGS_DIRECTORY = "data/bindings";
@@ -93,10 +89,6 @@
protected boolean jmxManagementEnabled = DEFAULT_JMX_MANAGEMENT_ENABLED;
- protected long callTimeout = DEFAULT_CALL_TIMEOUT;
-
- protected int packetConfirmationBatchSize = DEFAULT_PACKET_CONFIRMATION_BATCH_SIZE;
-
protected long connectionScanPeriod = DEFAULT_CONNECTION_SCAN_PERIOD;
protected List<String> interceptorClassNames = new ArrayList<String>();
@@ -187,26 +179,6 @@
requireDestinations = require;
}
- public long getCallTimeout()
- {
- return callTimeout;
- }
-
- public void setCallTimeout(final long timeout)
- {
- callTimeout = timeout;
- }
-
- public int getPacketConfirmationBatchSize()
- {
- return packetConfirmationBatchSize;
- }
-
- public void setPacketConfirmationBatchSize(final int size)
- {
- packetConfirmationBatchSize = size;
- }
-
public long getConnectionScanPeriod()
{
return connectionScanPeriod;
Modified: trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/config/impl/FileConfiguration.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -84,10 +84,6 @@
securityInvalidationInterval = getLong(e, "security-invalidation-interval", securityInvalidationInterval);
- callTimeout = getLong(e, "call-timeout", callTimeout);
-
- packetConfirmationBatchSize = getInteger(e, "packet-confirmation-batch-size", packetConfirmationBatchSize);
-
connectionScanPeriod = getLong(e, "connection-scan-period", connectionScanPeriod);
NodeList interceptorNodes = e.getElementsByTagName("remoting-interceptors");
Modified: trunk/src/main/org/jboss/messaging/core/management/ManagementService.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/management/ManagementService.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -22,8 +22,6 @@
package org.jboss.messaging.core.management;
-import java.util.Set;
-
import javax.management.NotificationBroadcaster;
import javax.management.ObjectName;
@@ -31,7 +29,6 @@
import org.jboss.messaging.core.messagecounter.MessageCounterManager;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
-import org.jboss.messaging.core.security.Role;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
@@ -51,8 +48,7 @@
MessagingServerControlMBean registerServer(PostOffice postOffice,
StorageManager storageManager,
- Configuration configuration,
- HierarchicalRepository<Set<Role>> securityRepository,
+ Configuration configuration,
HierarchicalRepository<QueueSettings> queueSettingsRepository,
MessagingServer messagingServer) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/management/MessagingServerControlMBean.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -92,16 +92,12 @@
public boolean isBackup();
- public long getCallTimeout();
-
public long getConnectionScanPeriod();
public int getJournalBufferReuseSize();
public long getPagingMaxGlobalSizeBytes();
- public int getPacketConfirmationBatchSize();
-
public String getPagingDirectory();
// Operations ----------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -148,8 +148,7 @@
public MessagingServerControlMBean registerServer(final PostOffice postOffice,
final StorageManager storageManager,
- final Configuration configuration,
- final HierarchicalRepository<Set<Role>> securityRepository,
+ final Configuration configuration,
final HierarchicalRepository<QueueSettings> queueSettingsRepository,
final MessagingServer messagingServer) throws Exception
{
@@ -158,8 +157,7 @@
this.queueSettingsRepository = queueSettingsRepository;
managedServer = new MessagingServerControl(postOffice,
storageManager,
- configuration,
- securityRepository,
+ configuration,
queueSettingsRepository,
messagingServer,
messageCounterManager,
Modified: trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/management/impl/MessagingServerControl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -71,7 +71,6 @@
private final PostOffice postOffice;
private final StorageManager storageManager;
private final Configuration configuration;
- private final HierarchicalRepository<Set<Role>> securityRepository;
private final HierarchicalRepository<QueueSettings> queueSettingsRepository;
private final MessagingServer server;
private final MessageCounterManager messageCounterManager;
@@ -85,7 +84,6 @@
public MessagingServerControl(PostOffice postOffice,
StorageManager storageManager, Configuration configuration,
- HierarchicalRepository<Set<Role>> securityRepository,
HierarchicalRepository<QueueSettings> queueSettingsRepository,
MessagingServer messagingServer, MessageCounterManager messageCounterManager,
NotificationBroadcasterSupport broadcaster) throws Exception
@@ -94,7 +92,6 @@
this.postOffice = postOffice;
this.storageManager = storageManager;
this.configuration = configuration;
- this.securityRepository = securityRepository;
this.queueSettingsRepository = queueSettingsRepository;
this.server = messagingServer;
this.messageCounterManager = messageCounterManager;
@@ -244,11 +241,6 @@
return configuration.getBindingsDirectory();
}
- public long getCallTimeout()
- {
- return configuration.getCallTimeout();
- }
-
public long getConnectionScanPeriod()
{
return configuration.getConnectionScanPeriod();
@@ -294,11 +286,6 @@
return configuration.getPagingMaxGlobalSizeBytes();
}
- public int getPacketConfirmationBatchSize()
- {
- return configuration.getPacketConfirmationBatchSize();
- }
-
public String getPagingDirectory()
{
return configuration.getPagingDirectory();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -26,7 +26,7 @@
{
Object getID();
- Channel getChannel(long channelID, int packetConfirmationBatchSize);
+ Channel getChannel(long channelID, int windowSize, boolean block);
long generateChannelID();
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/ConnectionManagerImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -75,14 +75,11 @@
private Object failConnectionLock = new Object();
- private final int sendWindowSize;
-
public ConnectionManagerImpl(final ConnectorFactory connectorFactory,
final Map<String, Object> params,
final long pingInterval,
final long callTimeout,
- final int maxConnections,
- final int sendWindowSize)
+ final int maxConnections)
{
this.connectorFactory = connectorFactory;
@@ -93,8 +90,6 @@
this.callTimeout = callTimeout;
this.maxConnections = maxConnections;
-
- this.sendWindowSize = sendWindowSize;
}
public RemotingConnection createConnection()
@@ -113,9 +108,9 @@
{
throw new IllegalStateException("Failed to connect");
}
+
+ RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
- RemotingConnection connection = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null, sendWindowSize);
-
handler.conn = connection;
listener.conn = connection;
@@ -146,7 +141,7 @@
throw new IllegalStateException("Failed to connect");
}
- conn = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null, sendWindowSize);
+ conn = new RemotingConnectionImpl(tc, callTimeout, pingInterval, pingExecutor, null);
handler.conn = conn;
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -216,10 +216,6 @@
private boolean frozen;
private final Object failLock = new Object();
-
- private final int sendWindowSize;
-
- private final Semaphore sendSemaphore;
// debug only stuff
@@ -235,31 +231,28 @@
final long blockingCallTimeout,
final long pingPeriod,
final ScheduledExecutorService pingExecutor,
- final List<Interceptor> interceptors,
- final int sendWindowSize)
+ final List<Interceptor> interceptors)
{
- this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true, sendWindowSize);
+ this(transportConnection, blockingCallTimeout, pingPeriod, pingExecutor, interceptors, null, true, true);
}
/*
* Create a server side connection
*/
- public RemotingConnectionImpl(final Connection transportConnection,
- final long blockingCallTimeout,
+ public RemotingConnectionImpl(final Connection transportConnection,
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
final boolean active)
{
this(transportConnection,
- blockingCallTimeout,
-1,
+ -1,
null,
interceptors,
replicatingConnection,
active,
- false,
- -1);
+ false);
}
private RemotingConnectionImpl(final Connection transportConnection,
@@ -269,8 +262,7 @@
final List<Interceptor> interceptors,
final RemotingConnection replicatingConnection,
final boolean active,
- final boolean client,
- final int sendWindowSize)
+ final boolean client)
{
this.transportConnection = transportConnection;
@@ -288,24 +280,13 @@
this.pingExecutor = pingExecutor;
// Channel zero is reserved for pinging
- pingChannel = getChannel(0, -1);
+ pingChannel = getChannel(0, -1, false);
pingChannel.setHandler(ppHandler);
this.client = client;
this.createdActive = active;
-
- this.sendWindowSize = sendWindowSize;
-
- if (sendWindowSize != -1)
- {
- this.sendSemaphore = new Semaphore(sendWindowSize, true);
- }
- else
- {
- this.sendSemaphore = null;
- }
}
public void startPinger()
@@ -333,13 +314,14 @@
}
public synchronized Channel getChannel(final long channelID,
- final int packetConfirmationBatchSize)
+ final int windowSize,
+ final boolean block)
{
ChannelImpl channel = channels.get(channelID);
if (channel == null)
{
- channel = new ChannelImpl(this, channelID, packetConfirmationBatchSize);
+ channel = new ChannelImpl(this, channelID, windowSize, block);
channels.put(channelID, channel);
}
@@ -454,7 +436,7 @@
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
final Packet packet = decode(buffer);
-
+
synchronized (transferLock)
{
if (!frozen)
@@ -537,27 +519,7 @@
channel.close();
}
}
-
- private void doWrite(final Packet packet)
- {
- final MessagingBuffer buffer = transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
-
- int size = packet.encode(buffer);
-
- if (packet.isRequiresConfirmations() && sendSemaphore != null)
- {
- try
- {
- sendSemaphore.acquire(size);
- }
- catch (InterruptedException e)
- {
- }
- }
-
- transportConnection.write(buffer);
- }
-
+
private Packet decode(final MessagingBuffer in)
{
final byte packetType = in.getByte();
@@ -861,14 +823,10 @@
private final java.util.Queue<Packet> resendCache;
- private final int packetConfirmationBatchSize;
-
private volatile int firstStoredCommandID;
private volatile int lastReceivedCommandID = -1;
- private volatile int nextConfirmation;
-
private Channel replicatingChannel;
private volatile RemotingConnectionImpl connection;
@@ -886,10 +844,19 @@
private boolean failingOver;
private final Queue<DelayedResult> responseActions = new ConcurrentLinkedQueue<DelayedResult>();
+
+ private final int windowSize;
+
+ private final int confWindowSize;
+
+ private final Semaphore sendSemaphore;
+
+ private int receivedBytes;
private ChannelImpl(final RemotingConnectionImpl connection,
final long id,
- final int packetConfirmationBatchSize)
+ final int windowSize,
+ final boolean block)
{
this.connection = connection;
@@ -898,33 +865,46 @@
if (connection.replicatingConnection != null)
{
// Don't want to send confirmations if replicating to backup
- this.packetConfirmationBatchSize = -1;
+ this.windowSize = -1;
+ this.confWindowSize = -1;
+
//We don't redirect the ping channel
if (id != 0)
{
- replicatingChannel = connection.replicatingConnection.getChannel(id, -1);
+ replicatingChannel = connection.replicatingConnection.getChannel(id, -1, false);
replicatingChannel.setHandler(new ReplicatedPacketsConfirmedChannelHandler());
}
}
else
{
- this.packetConfirmationBatchSize = packetConfirmationBatchSize;
+ this.windowSize = windowSize;
+
+ this.confWindowSize = (int)(0.75 * windowSize);
replicatingChannel = null;
}
- if (this.packetConfirmationBatchSize != -1)
+ if (this.windowSize != -1)
{
resendCache = new ConcurrentLinkedQueue<Packet>();
-
- nextConfirmation = packetConfirmationBatchSize - 1;
+
+ if (block)
+ {
+ sendSemaphore = new Semaphore(windowSize, true);
+ }
+ else
+ {
+ sendSemaphore = null;
+ }
}
else
{
resendCache = null;
+
+ sendSemaphore = null;
}
}
@@ -982,12 +962,7 @@
}
}
- addToCache(packet);
-
- if (connection.active || packet.isWriteAlways())
- {
- connection.doWrite(packet);
- }
+ addToCacheAndWrite(packet, true);
}
finally
{
@@ -1003,6 +978,11 @@
{
throw new MessagingException(MessagingException.NOT_CONNECTED, "Connection is destroyed");
}
+
+ if (connection.blockingCallTimeout == -1)
+ {
+ throw new IllegalStateException("Cannot do a blocking call timeout on a server side connection");
+ }
packet.setChannelID(id);
@@ -1022,12 +1002,10 @@
}
}
- addToCache(packet);
-
response = null;
-
- connection.doWrite(packet);
-
+
+ addToCacheAndWrite(packet, false);
+
long toWait = connection.blockingCallTimeout;
long start = System.currentTimeMillis();
@@ -1104,7 +1082,7 @@
packet.setChannelID(id);
- connection.doWrite(packet);
+ doWrite(packet);
}
}
@@ -1184,7 +1162,7 @@
for (final Packet packet : resendCache)
{
- connection.doWrite(packet);
+ doWrite(packet);
}
}
@@ -1287,32 +1265,64 @@
}
}
}
+
+ private void doWrite(final Packet packet)
+ {
+ final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+ packet.encode(buffer);
+
+ connection.transportConnection.write(buffer);
+ }
+
private void checkConfirmation(final Packet packet)
{
if (resendCache != null && packet.isRequiresConfirmations())
{
lastReceivedCommandID++;
-
- if (lastReceivedCommandID == nextConfirmation)
+
+ receivedBytes += packet.getPacketSize();
+
+ if (receivedBytes >= confWindowSize)
{
final Packet confirmed = new PacketsConfirmedMessage(lastReceivedCommandID);
- nextConfirmation += packetConfirmationBatchSize;
-
confirmed.setChannelID(id);
+
+ receivedBytes = 0;
- connection.doWrite(confirmed);
+ doWrite(confirmed);
}
}
}
- private void addToCache(final Packet packet)
+ private void addToCacheAndWrite(final Packet packet, final boolean checkActive)
{
- if (resendCache != null)
+ final MessagingBuffer buffer = connection.transportConnection.createBuffer(PacketImpl.INITIAL_BUFFER_SIZE);
+
+ int size = packet.encode(buffer);
+
+ if (resendCache != null && packet.isRequiresConfirmations())
{
resendCache.add(packet);
+
+ if (sendSemaphore != null)
+ {
+ try
+ {
+ sendSemaphore.acquire(size);
+ }
+ catch (InterruptedException e)
+ {
+ throw new IllegalStateException("Semaphore interrupted");
+ }
+ }
}
+
+ if (!checkActive || connection.active || packet.isWriteAlways())
+ {
+ connection.transportConnection.write(buffer);
+ }
}
private void clearUpTo(final int lastReceivedCommandID)
@@ -1353,9 +1363,9 @@
firstStoredCommandID += numberToClear;
- if (connection.sendSemaphore != null)
+ if (sendSemaphore != null)
{
- connection.sendSemaphore.release(sizeToFree);
+ sendSemaphore.release(sizeToFree);
}
}
@@ -1367,7 +1377,7 @@
{
case PACKETS_CONFIRMED:
{
- connection.doWrite(packet);
+ doWrite(packet);
break;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingServiceImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -61,8 +61,6 @@
private final Set<Acceptor> acceptors = new HashSet<Acceptor>();
- private final long callTimeout;
-
private final Map<Object, RemotingConnection> connections = new ConcurrentHashMap<Object, RemotingConnection>();
private Timer failedConnectionTimer;
@@ -99,8 +97,6 @@
}
}
- callTimeout = config.getCallTimeout();
-
connectionScanPeriod = config.getConnectionScanPeriod();
backup = config.isBackup();
@@ -216,13 +212,12 @@
RemotingConnection replicatingConnection = server.getReplicatingConnection();
- RemotingConnection rc = new RemotingConnectionImpl(connection,
- callTimeout,
+ RemotingConnection rc = new RemotingConnectionImpl(connection,
interceptors,
replicatingConnection,
!backup);
- Channel channel1 = rc.getChannel(1, -1);
+ Channel channel1 = rc.getChannel(1, -1, false);
ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -22,10 +22,6 @@
package org.jboss.messaging.core.remoting.impl.wireformat;
-import static org.jboss.messaging.util.DataConstants.SIZE_BYTE;
-import static org.jboss.messaging.util.DataConstants.SIZE_INT;
-import static org.jboss.messaging.util.DataConstants.SIZE_LONG;
-
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
/**
@@ -56,6 +52,8 @@
private boolean autoCommitAcks;
+ private int windowSize;
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
@@ -63,7 +61,7 @@
public CreateSessionMessage(final String name, final long sessionChannelID,
final int version, final String username, final String password,
final boolean xa, final boolean autoCommitSends,
- final boolean autoCommitAcks)
+ final boolean autoCommitAcks, final int windowSize)
{
super(CREATESESSION);
@@ -82,6 +80,8 @@
this.autoCommitSends = autoCommitSends;
this.autoCommitAcks = autoCommitAcks;
+
+ this.windowSize = windowSize;
}
public CreateSessionMessage()
@@ -131,6 +131,11 @@
return this.autoCommitAcks;
}
+ public int getWindowSize()
+ {
+ return this.windowSize;
+ }
+
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putString(name);
@@ -141,6 +146,7 @@
buffer.putBoolean(xa);
buffer.putBoolean(autoCommitSends);
buffer.putBoolean(autoCommitAcks);
+ buffer.putInt(windowSize);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -153,6 +159,7 @@
xa = buffer.getBoolean();
autoCommitSends = buffer.getBoolean();
autoCommitAcks = buffer.getBoolean();
+ windowSize = buffer.getInt();
}
public boolean equals(Object other)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -38,19 +38,15 @@
private int serverVersion;
- private int packetConfirmationBatchSize;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public CreateSessionResponseMessage(final int serverVersion, final int packetConfirmationBatchSize)
+ public CreateSessionResponseMessage(final int serverVersion)
{
super(CREATESESSION_RESP);
this.serverVersion = serverVersion;
-
- this.packetConfirmationBatchSize = packetConfirmationBatchSize;
}
public CreateSessionResponseMessage()
@@ -70,21 +66,14 @@
return serverVersion;
}
- public int getPacketConfirmationBatchSize()
- {
- return packetConfirmationBatchSize;
- }
-
public void encodeBody(final MessagingBuffer buffer)
{
buffer.putInt(serverVersion);
- buffer.putInt(packetConfirmationBatchSize);
}
public void decodeBody(final MessagingBuffer buffer)
{
serverVersion = buffer.getInt();
- packetConfirmationBatchSize = buffer.getInt();
}
public boolean equals(Object other)
@@ -97,7 +86,7 @@
CreateSessionResponseMessage r = (CreateSessionResponseMessage)other;
boolean matches = super.equals(other) &&
- this.packetConfirmationBatchSize == r.packetConfirmationBatchSize;
+ this.serverVersion == r.serverVersion;
return matches;
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -185,15 +185,15 @@
encodeBody(buffer);
+ size = buffer.position();
+
// The length doesn't include the actual length byte
- int len = buffer.position() - DataConstants.SIZE_INT;
+ int len = size - DataConstants.SIZE_INT;
buffer.putInt(0, len);
buffer.flip();
- size = DataConstants.SIZE_INT + len;
-
return size;
}
@@ -202,6 +202,8 @@
channelID = buffer.getLong();
decodeBody(buffer);
+
+ size = buffer.position();
}
public int getPacketSize()
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ReplicateCreateSessionMessage.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -51,6 +51,8 @@
private boolean autoCommitSends;
private boolean autoCommitAcks;
+
+ private int windowSize;
// Static --------------------------------------------------------
@@ -63,7 +65,8 @@
final String password,
final boolean xa,
final boolean autoCommitSends,
- final boolean autoCommitAcks)
+ final boolean autoCommitAcks,
+ final int windowSize)
{
super(REPLICATE_CREATESESSION);
@@ -82,6 +85,8 @@
this.autoCommitSends = autoCommitSends;
this.autoCommitAcks = autoCommitAcks;
+
+ this.windowSize = windowSize;
}
public ReplicateCreateSessionMessage()
@@ -130,6 +135,11 @@
{
return this.autoCommitAcks;
}
+
+ public int getWindowSize()
+ {
+ return this.windowSize;
+ }
public void encodeBody(final MessagingBuffer buffer)
{
@@ -141,6 +151,7 @@
buffer.putBoolean(xa);
buffer.putBoolean(autoCommitSends);
buffer.putBoolean(autoCommitAcks);
+ buffer.putInt(windowSize);
}
public void decodeBody(final MessagingBuffer buffer)
@@ -153,6 +164,7 @@
xa = buffer.getBoolean();
autoCommitSends = buffer.getBoolean();
autoCommitAcks = buffer.getBoolean();
+ windowSize = buffer.getInt();
}
public boolean equals(Object other)
@@ -171,7 +183,8 @@
this.autoCommitSends == r.autoCommitSends &&
this.autoCommitAcks == r.autoCommitAcks &&
(this.username == null ? r.username == null : this.username.equals(r.username)) &&
- (this.password == null ? r.password == null : this.password.equals(r.password));
+ (this.password == null ? r.password == null : this.password.equals(r.password)) &&
+ this.windowSize == r.windowSize;
return matches;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -71,7 +71,8 @@
RemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
- boolean xa) throws Exception;
+ boolean xa,
+ int sendWindowSize) throws Exception;
CreateSessionResponseMessage replicateCreateSession(String name,
long channelID,
@@ -81,7 +82,8 @@
RemotingConnection remotingConnection,
boolean autoCommitSends,
boolean autoCommitAcks,
- boolean xa) throws Exception;
+ boolean xa,
+ int sendWindowSize) throws Exception;
void removeSession(String name) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -231,7 +231,6 @@
serverManagement = managementService.registerServer(postOffice,
storageManager,
configuration,
- securityRepository,
queueSettingsRepository,
this);
@@ -261,8 +260,7 @@
backupConnectorParams,
5000,
30000,
- ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS,
- ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE);
+ ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS);
}
remotingService.setMessagingServer(this);
@@ -408,44 +406,44 @@
if (configuration.isBackup())
{
freezeAllBackupConnections();
-
+
postOffice.activate();
configuration.setBackup(false);
- remotingService.setBackup(false);
+ remotingService.setBackup(false);
}
connection.activate();
}
-
- //We need to prevent any more packets being handled on any connections (from live) as soon as first live connection
- //is created or re-attaches, to prevent a situation like the following:
- //connection 1 create queue A
- //connection 2 fails over
- //A gets activated since no consumers
- //connection 1 create consumer on A
- //connection 1 delivery
- //connection 1 delivery gets replicated
- //can't find message in queue since active was delivered immediately
+
+ // We need to prevent any more packets being handled on any connections (from live) as soon as first live connection
+ // is created or re-attaches, to prevent a situation like the following:
+ // connection 1 create queue A
+ // connection 2 fails over
+ // A gets activated since no consumers
+ // connection 1 create consumer on A
+ // connection 1 delivery
+ // connection 1 delivery gets replicated
+ // can't find message in queue since active was delivered immediately
private void freezeAllBackupConnections()
{
Set<RemotingConnection> connections = new HashSet<RemotingConnection>();
-
- for (ServerSession session: sessions.values())
+
+ for (ServerSession session : sessions.values())
{
connections.add(session.getChannel().getConnection());
}
-
- for (RemotingConnection connection: connections)
+
+ for (RemotingConnection connection : connections)
{
connection.freeze();
}
}
-
+
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
- final String name,
- final int lastReceivedCommandID) throws Exception
+ final String name,
+ final int lastReceivedCommandID) throws Exception
{
ServerSession session = sessions.get(name);
@@ -468,14 +466,15 @@
}
public CreateSessionResponseMessage replicateCreateSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int incrementingVersion,
- final RemotingConnection connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean xa) throws Exception
+ final long channelID,
+ final String username,
+ final String password,
+ final int incrementingVersion,
+ final RemotingConnection connection,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean xa,
+ final int sendWindowSize) throws Exception
{
return doCreateSession(name,
channelID,
@@ -485,24 +484,21 @@
connection,
autoCommitSends,
autoCommitAcks,
- xa);
+ xa,
+ sendWindowSize);
}
public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
- final String username,
- final String password,
- final int incrementingVersion,
- final RemotingConnection connection,
- final boolean autoCommitSends,
- final boolean autoCommitAcks,
- final boolean xa) throws Exception
+ final long channelID,
+ final String username,
+ final String password,
+ final int incrementingVersion,
+ final RemotingConnection connection,
+ final boolean autoCommitSends,
+ final boolean autoCommitAcks,
+ final boolean xa,
+ final int sendWindowSize)throws Exception
{
-// if (configuration.isBackup())
-// {
-// throw new IllegalStateException("Cannot create a session on a backup server");
-// }
-
checkActivate(connection);
return doCreateSession(name,
@@ -513,7 +509,8 @@
connection,
autoCommitSends,
autoCommitAcks,
- xa);
+ xa,
+ sendWindowSize);
}
public void removeSession(final String name) throws Exception
@@ -569,7 +566,7 @@
// --------------------------------------------------------------------------------------
private final Object createSessionLock = new Object();
-
+
private CreateSessionResponseMessage doCreateSession(final String name,
final long channelID,
final String username,
@@ -578,7 +575,8 @@
final RemotingConnection connection,
final boolean autoCommitSends,
final boolean autoCommitAcks,
- final boolean xa) throws Exception
+ final boolean xa,
+ final int sendWindowSize) throws Exception
{
if (version.getIncrementingVersion() < incrementingVersion)
{
@@ -608,7 +606,7 @@
currentSession.getChannel().close();
}
- Channel channel = connection.getChannel(channelID, configuration.getPacketConfirmationBatchSize());
+ Channel channel = connection.getChannel(channelID, sendWindowSize, false);
final ServerSessionImpl session = new ServerSessionImpl(name,
channelID,
@@ -637,8 +635,7 @@
connection.addFailureListener(session);
- return new CreateSessionResponseMessage(version.getIncrementingVersion(),
- configuration.getPacketConfirmationBatchSize());
+ return new CreateSessionResponseMessage(version.getIncrementingVersion());
}
// Inner classes
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -70,7 +70,8 @@
msg.getVersion(), msg.getUsername(),
msg.getPassword(), msg.isXA(),
msg.isAutoCommitSends(),
- msg.isAutoCommitAcks());
+ msg.isAutoCommitAcks(),
+ msg.getWindowSize());
result = channel1.replicatePacket(replPacket);
}
@@ -97,7 +98,8 @@
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
- request.isXA());
+ request.isXA(),
+ request.getWindowSize());
break;
}
case REPLICATE_CREATESESSION:
@@ -112,7 +114,8 @@
connection,
request.isAutoCommitSends(),
request.isAutoCommitAcks(),
- request.isXA());
+ request.isXA(),
+ request.getWindowSize());
break;
}
case REATTACH_SESSION:
Modified: trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/src/main/org/jboss/messaging/jms/server/impl/JMSServerDeployer.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -130,7 +130,7 @@
NodeList children = node.getChildNodes();
long pingPeriod = ClientSessionFactoryImpl.DEFAULT_PING_PERIOD;
- long callTimeout = ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
+ long callTimeout = ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
String clientID = null;
int dupsOKBatchSize = DEFAULT_DUPS_OK_BATCH_SIZE;
int consumerWindowSize = ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/clientcrash/ClientCrashTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -26,12 +26,12 @@
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_ACKNOWLEDGE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_BLOCK_ON_PERSISTENT_SEND;
+import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CALL_TIMEOUT;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_CONSUMER_WINDOW_SIZE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_PRODUCER_MAX_RATE;
import static org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl.DEFAULT_SEND_WINDOW_SIZE;
-import static org.jboss.messaging.core.config.impl.ConfigurationImpl.DEFAULT_CALL_TIMEOUT;
import junit.framework.TestCase;
import org.jboss.messaging.core.client.ClientConsumer;
@@ -63,7 +63,9 @@
// Constants -----------------------------------------------------
public static final SimpleString QUEUE = new SimpleString("ClientCrashTestQueue");
+
public static final String MESSAGE_TEXT_FROM_SERVER = "ClientCrashTest from server";
+
public static final String MESSAGE_TEXT_FROM_CLIENT = "ClientCrashTest from client";
// Static --------------------------------------------------------
@@ -73,6 +75,7 @@
// Attributes ----------------------------------------------------
private MessagingService messagingService;
+
private ClientSessionFactory sf;
// Constructors --------------------------------------------------
@@ -100,9 +103,8 @@
// spawn a JVM that creates a JMS client, which waits to receive a test
// message
- Process p = SpawnedVMSupport.spawnVM(CrashClient.class
- .getName(), new String[]{Integer
- .toString(numberOfConnectionsOnTheClient)});
+ Process p = SpawnedVMSupport.spawnVM(CrashClient.class.getName(),
+ new String[] { Integer.toString(numberOfConnectionsOnTheClient) });
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(QUEUE, QUEUE, null, false, false);
@@ -116,10 +118,13 @@
assertNotNull("no message received", messageFromClient);
assertEquals(MESSAGE_TEXT_FROM_CLIENT, messageFromClient.getBody().getString());
- assertActiveConnections(1 + 1); //One local and one from the other vm
+ assertActiveConnections(1 + 1); // One local and one from the other vm
- ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE, false, 0,
- System.currentTimeMillis(), (byte) 1);
+ ClientMessage message = session.createClientMessage(JBossTextMessage.TYPE,
+ false,
+ 0,
+ System.currentTimeMillis(),
+ (byte)1);
message.getBody().putString(ClientCrashTest.MESSAGE_TEXT_FROM_SERVER);
producer.send(message);
@@ -149,23 +154,24 @@
ConfigurationImpl config = new ConfigurationImpl();
config.setSecurityEnabled(false);
- config.getAcceptorConfigurations().add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory"));
+ config.getAcceptorConfigurations()
+ .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyAcceptorFactory"));
messagingService = MessagingServiceImpl.newNullStorageMessagingServer(config);
messagingService.start();
sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.netty.NettyConnectorFactory"),
- null,
- 2000,
- DEFAULT_CALL_TIMEOUT,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_SEND_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- DEFAULT_BLOCK_ON_ACKNOWLEDGE,
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP_ID,
- DEFAULT_MAX_CONNECTIONS);
+ null,
+ 2000,
+ DEFAULT_CALL_TIMEOUT,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_SEND_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ DEFAULT_BLOCK_ON_ACKNOWLEDGE,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP_ID,
+ DEFAULT_MAX_CONNECTIONS);
}
@Override
@@ -180,8 +186,7 @@
// Private -------------------------------------------------------
- private void assertActiveConnections(int expectedActiveConnections)
- throws Exception
+ private void assertActiveConnections(int expectedActiveConnections) throws Exception
{
assertEquals(expectedActiveConnections, messagingService.getServer().getServerManagement().getConnectionCount());
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/MultiThreadRandomFailoverTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -1237,6 +1237,8 @@
final ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+
+ sf.setSendWindowSize(32 * 1024);
ClientSession session = sf.createSession(false, false, false, false);
@@ -1321,7 +1323,6 @@
{
Configuration backupConf = new ConfigurationImpl();
backupConf.setSecurityEnabled(false);
- backupConf.setPacketConfirmationBatchSize(10);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
@@ -1336,7 +1337,6 @@
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
- liveConf.setPacketConfirmationBatchSize(10);
liveConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/RandomFailoverTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -230,6 +230,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, false, false, false);
Failer failer = startFailer(1000, session);
@@ -499,7 +501,7 @@
final int numMessages = 100;
- final int numSessions = 50;
+ final int numSessions = 10;
Set<ClientConsumer> consumers = new HashSet<ClientConsumer>();
Set<ClientSession> sessions = new HashSet<ClientSession>();
@@ -1468,7 +1470,6 @@
{
Configuration backupConf = new ConfigurationImpl();
backupConf.setSecurityEnabled(false);
- backupConf.setPacketConfirmationBatchSize(10);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
@@ -1483,7 +1484,6 @@
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
- liveConf.setPacketConfirmationBatchSize(10);
liveConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleAutomaticFailoverTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -86,6 +86,8 @@
{
ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -140,6 +142,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -219,6 +223,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -301,6 +307,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -370,6 +378,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -441,6 +451,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -526,6 +538,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
final int numSessions = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS * 2;
List<ClientSession> sessions = new ArrayList<ClientSession>();
@@ -615,6 +629,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
final int numSessions = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS * 2;
List<ClientSession> sessions = new ArrayList<ClientSession>();
@@ -646,6 +662,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
final int numSessions = ClientSessionFactoryImpl.DEFAULT_MAX_CONNECTIONS * 2;
List<ClientSession> sessions = new ArrayList<ClientSession>();
@@ -682,6 +700,8 @@
new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
backupParams));
+ sf.setSendWindowSize(32 * 1024);
+
ClientSession session = sf.createSession(false, true, true, false);
session.createQueue(ADDRESS, ADDRESS, null, false, false);
@@ -776,7 +796,6 @@
{
Configuration backupConf = new ConfigurationImpl();
backupConf.setSecurityEnabled(false);
- backupConf.setPacketConfirmationBatchSize(10);
backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
backupConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
@@ -787,7 +806,6 @@
Configuration liveConf = new ConfigurationImpl();
liveConf.setSecurityEnabled(false);
- liveConf.setPacketConfirmationBatchSize(10);
liveConf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/SimpleManualFailoverTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -181,7 +181,6 @@
{
Configuration server1Conf = new ConfigurationImpl();
server1Conf.setSecurityEnabled(false);
- server1Conf.setPacketConfirmationBatchSize(1);
server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
server1Conf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
@@ -191,7 +190,6 @@
Configuration server0Conf = new ConfigurationImpl();
server0Conf.setSecurityEnabled(false);
- server0Conf.setPacketConfirmationBatchSize(1);
server0Conf.getAcceptorConfigurations()
.add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
server0Service = MessagingServiceImpl.newNullStorageMessagingServer(server0Conf);
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/remoting/PingTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -108,7 +108,7 @@
ConnectorFactory cf = new NettyConnectorFactory();
Map<String, Object> params = new HashMap<String, Object>();
- ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1, 1);
+ ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1);
RemotingConnection conn = cm.getConnection();
assertNotNull(conn);
@@ -159,7 +159,7 @@
ConnectorFactory cf = new NettyConnectorFactory();
Map<String, Object> params = new HashMap<String, Object>();
- ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1, 1);
+ ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1);
RemotingConnection conn = cm.getConnection();
assertNotNull(conn);
@@ -210,7 +210,7 @@
ConnectorFactory cf = new NettyConnectorFactory();
Map<String, Object> params = new HashMap<String, Object>();
- ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1, 1);
+ ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1);
RemotingConnectionImpl conn = (RemotingConnectionImpl)cm.getConnection();
assertEquals(1, cm.numConnections());
@@ -288,7 +288,7 @@
ConnectorFactory cf = new NettyConnectorFactory();
Map<String, Object> params = new HashMap<String, Object>();
- ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1, 1);
+ ConnectionManager cm = new ConnectionManagerImpl(cf, params, PING_INTERVAL, 5000, 1);
RemotingConnection conn = cm.getConnection();
assertNotNull(conn);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/ConfigurationImplTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -58,8 +58,6 @@
assertEquals(ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL, conf.getSecurityInvalidationInterval());
assertEquals(ConfigurationImpl.DEFAULT_REQUIRE_DESTINATIONS, conf.isRequireDestinations());
assertEquals(ConfigurationImpl.DEFAULT_SECURITY_ENABLED, conf.isSecurityEnabled());
- assertEquals(ConfigurationImpl.DEFAULT_CALL_TIMEOUT, conf.getCallTimeout());
- assertEquals(ConfigurationImpl.DEFAULT_PACKET_CONFIRMATION_BATCH_SIZE, conf.getPacketConfirmationBatchSize());
assertEquals(ConfigurationImpl.DEFAULT_CONNECTION_SCAN_PERIOD, conf.getConnectionScanPeriod());
assertEquals(ConfigurationImpl.DEFAULT_BINDINGS_DIRECTORY, conf.getBindingsDirectory());
assertEquals(ConfigurationImpl.DEFAULT_CREATE_BINDINGS_DIR, conf.isCreateBindingsDir());
@@ -101,16 +99,8 @@
b = randomBoolean();
conf.setSecurityEnabled(b);
assertEquals(b, conf.isSecurityEnabled());
-
+
l = randomLong();
- conf.setCallTimeout(l);
- assertEquals(l, conf.getCallTimeout());
-
- i = randomInt();
- conf.setPacketConfirmationBatchSize(i);
- assertEquals(i, conf.getPacketConfirmationBatchSize());
-
- l = randomLong();
conf.setConnectionScanPeriod(l);
assertEquals(l, conf.getConnectionScanPeriod());
@@ -190,12 +180,6 @@
conf.setSecurityEnabled(b);
l = randomLong();
- conf.setCallTimeout(l);
-
- i = randomInt();
- conf.setPacketConfirmationBatchSize(i);
-
- l = randomLong();
conf.setConnectionScanPeriod(l);
String s = randomString();
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -43,8 +43,6 @@
assertEquals(5423, conf.getSecurityInvalidationInterval());
assertEquals(false, conf.isRequireDestinations());
assertEquals(false, conf.isSecurityEnabled());
- assertEquals(7654, conf.getCallTimeout());
- assertEquals(543, conf.getPacketConfirmationBatchSize());
assertEquals(6543, conf.getConnectionScanPeriod());
assertEquals("somedir", conf.getBindingsDirectory());
assertEquals(false, conf.isCreateBindingsDir());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/config/impl/FileConfigurationTest2.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -43,8 +43,6 @@
assertEquals(ConfigurationImpl.DEFAULT_SECURITY_INVALIDATION_INTERVAL, conf.getSecurityInvalidationInterval());
assertEquals(ConfigurationImpl.DEFAULT_REQUIRE_DESTINATIONS, conf.isRequireDestinations());
assertEquals(ConfigurationImpl.DEFAULT_SECURITY_ENABLED, conf.isSecurityEnabled());
- assertEquals(ConfigurationImpl.DEFAULT_CALL_TIMEOUT, conf.getCallTimeout());
- assertEquals(ConfigurationImpl.DEFAULT_PACKET_CONFIRMATION_BATCH_SIZE, conf.getPacketConfirmationBatchSize());
assertEquals(ConfigurationImpl.DEFAULT_CONNECTION_SCAN_PERIOD, conf.getConnectionScanPeriod());
assertEquals(ConfigurationImpl.DEFAULT_BINDINGS_DIRECTORY, conf.getBindingsDirectory());
assertEquals(ConfigurationImpl.DEFAULT_CREATE_BINDINGS_DIR, conf.isCreateBindingsDir());
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/ManagementServiceImplTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -96,7 +96,7 @@
replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
ManagementService service = new ManagementServiceImpl(mbeanServer, true);
- service.registerServer(postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
+ service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, messagingServer);
verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
}
@@ -124,7 +124,7 @@
replay(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
ManagementService service = new ManagementServiceImpl(mbeanServer, true);
- service.registerServer(postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
+ service.registerServer(postOffice, storageManager, configuration, queueSettingsRepository, messagingServer);
verify(mbeanServer, postOffice, storageManager, configuration, securityRepository, queueSettingsRepository, messagingServer);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java 2008-11-03 09:39:48 UTC (rev 5240)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/management/impl/MessagingServerControlTest.java 2008-11-03 13:01:51 UTC (rev 5241)
@@ -521,7 +521,7 @@
private MessagingServerControl createControl() throws Exception
{
MessagingServerControl control = new MessagingServerControl(postOffice,
- storageManager, configuration, securityRepository,
+ storageManager, configuration,
queueSettingsRepository, server, messageCounterManager, new NotificationBroadcasterSupport());
return control;
}
More information about the jboss-cvs-commits
mailing list