Author: timfox
Date: 2009-12-03 05:43:37 -0500 (Thu, 03 Dec 2009)
New Revision: 8521
Modified:
trunk/docs/user-manual/en/configuration-index.xml
trunk/src/config/common/schema/hornetq-configuration.xsd
trunk/src/main/org/hornetq/core/config/Configuration.java
trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/tests/config/ConfigurationTest-full-config.xml
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
don't use reattach on the backup connection
Modified: trunk/docs/user-manual/en/configuration-index.xml
===================================================================
--- trunk/docs/user-manual/en/configuration-index.xml 2009-12-03 10:35:49 UTC (rev 8520)
+++ trunk/docs/user-manual/en/configuration-index.xml 2009-12-03 10:43:37 UTC (rev 8521)
@@ -57,13 +57,6 @@
<entry/>
</row>
<row>
- <entry><link
linkend="configuring.live.backup"
- >backup-window-size</link></entry>
- <entry>int</entry>
- <entry>The Window Size used to flow control between
live and backup</entry>
- <entry>1 MiB</entry>
- </row>
- <row>
<entry><link
linkend="configuring.bindings.journal"
bindings-directory</link></entry>
<entry>String</entry>
Modified: trunk/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-03 10:35:49 UTC (rev
8520)
+++ trunk/src/config/common/schema/hornetq-configuration.xsd 2009-12-03 10:43:37 UTC (rev
8521)
@@ -74,9 +74,7 @@
<xsd:element maxOccurs="1" minOccurs="0"
name="persist-delivery-count-before-delivery" type="xsd:boolean">
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="backup-connector-ref" type="backup-connectorType">
- </xsd:element>
- <xsd:element maxOccurs="1" minOccurs="0"
name="backup-window-size" type="xsd:int">
- </xsd:element>
+ </xsd:element>
<xsd:element maxOccurs="1" minOccurs="0"
name="connectors">
<xsd:complexType>
<xsd:sequence>
Modified: trunk/src/main/org/hornetq/core/config/Configuration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-03 10:35:49 UTC (rev
8520)
+++ trunk/src/main/org/hornetq/core/config/Configuration.java 2009-12-03 10:43:37 UTC (rev
8521)
@@ -120,10 +120,6 @@
String getBackupConnectorName();
- int getBackupWindowSize();
-
- void setBackupWindowSize(int windowSize);
-
void setBackupConnectorName(String name);
List<BroadcastGroupConfiguration> getBroadcastGroupConfigurations();
Modified: trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-03 10:35:49
UTC (rev 8520)
+++ trunk/src/main/org/hornetq/core/config/impl/ConfigurationImpl.java 2009-12-03 10:43:37
UTC (rev 8521)
@@ -113,8 +113,6 @@
public static final int DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO = (int)(1000000000d /
300);
public static final int DEFAULT_JOURNAL_BUFFER_SIZE_NIO = 490 * 1024;
-
-
public static final boolean DEFAULT_JOURNAL_LOG_WRITE_RATE = false;
@@ -176,8 +174,6 @@
public static final long DEFAULT_MEMORY_MEASURE_INTERVAL = -1; // in milliseconds
- public static final int DEFAULT_BACKUP_WINDOW_SIZE = 1024 * 1024;
-
public static final String DEFAULT_LOG_DELEGATE_FACTORY_CLASS_NAME =
JULLogDelegateFactory.class.getCanonicalName();
// Attributes
-----------------------------------------------------------------------------
@@ -230,8 +226,6 @@
protected String backupConnectorName;
- protected int backupWindowSize = DEFAULT_BACKUP_WINDOW_SIZE;
-
protected List<BridgeConfiguration> bridgeConfigurations = new
ArrayList<BridgeConfiguration>();
protected List<DivertConfiguration> divertConfigurations = new
ArrayList<DivertConfiguration>();
@@ -274,22 +268,19 @@
protected int journalMinFiles = DEFAULT_JOURNAL_MIN_FILES;
-
- //AIO and NIO need different values for these attributes
-
+ // AIO and NIO need different values for these attributes
+
protected int journalMaxIO_AIO = DEFAULT_JOURNAL_MAX_IO_AIO;
protected int journalBufferTimeout_AIO = DEFAULT_JOURNAL_BUFFER_TIMEOUT_AIO;
protected int journalBufferSize_AIO = DEFAULT_JOURNAL_BUFFER_SIZE_AIO;
-
+
protected int journalMaxIO_NIO = DEFAULT_JOURNAL_MAX_IO_NIO;
protected int journalBufferTimeout_NIO = DEFAULT_JOURNAL_BUFFER_TIMEOUT_NIO;
protected int journalBufferSize_NIO = DEFAULT_JOURNAL_BUFFER_SIZE_NIO;
-
-
protected boolean logJournalWriteRate = DEFAULT_JOURNAL_LOG_WRITE_RATE;
@@ -509,19 +500,6 @@
this.backupConnectorName = backupConnectorName;
}
- /* (non-Javadoc)
- * @see org.hornetq.core.config.Configuration#getBackupWindowSize()
- */
- public int getBackupWindowSize()
- {
- return backupWindowSize;
- }
-
- public void setBackupWindowSize(int windowSize)
- {
- this.backupWindowSize = windowSize;
- }
-
public GroupingHandlerConfiguration getGroupingHandlerConfiguration()
{
return groupingHandlerConfiguration;
@@ -966,8 +944,6 @@
{
this.logDelegateFactoryClassName = className;
}
-
-
public int getJournalMaxIO_AIO()
{
@@ -998,8 +974,7 @@
{
this.journalBufferSize_AIO = journalBufferSize;
}
-
-
+
public int getJournalMaxIO_NIO()
{
return journalMaxIO_NIO;
@@ -1029,8 +1004,6 @@
{
this.journalBufferSize_NIO = journalBufferSize;
}
-
-
@Override
public boolean equals(Object obj)
@@ -1063,11 +1036,6 @@
else if (!bindingsDirectory.equals(other.bindingsDirectory))
return false;
- if (backupWindowSize != other.backupWindowSize)
- {
- return false;
- }
-
if (clustered != other.clustered)
return false;
if (connectionTTLOverride != other.connectionTTLOverride)
@@ -1090,13 +1058,13 @@
if (journalBufferTimeout_AIO != other.journalBufferTimeout_AIO)
return false;
if (journalMaxIO_AIO != other.journalMaxIO_AIO)
- return false;
+ return false;
if (this.journalBufferSize_NIO != other.journalBufferSize_NIO)
return false;
if (journalBufferTimeout_NIO != other.journalBufferTimeout_NIO)
return false;
if (journalMaxIO_NIO != other.journalMaxIO_NIO)
- return false;
+ return false;
if (journalCompactMinFiles != other.journalCompactMinFiles)
return false;
if (journalCompactPercentage != other.journalCompactPercentage)
Modified: trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java
===================================================================
--- trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-03 10:35:49
UTC (rev 8520)
+++ trunk/src/main/org/hornetq/core/config/impl/FileConfiguration.java 2009-12-03 10:43:37
UTC (rev 8521)
@@ -395,10 +395,7 @@
memoryWarningThreshold = getInteger(e, "memory-warning-threshold",
memoryWarningThreshold, PERCENTAGE);
memoryMeasureInterval = getLong(e, "memory-measure-interval",
memoryMeasureInterval, MINUS_ONE_OR_GT_ZERO); // in
- // milliseconds
- backupWindowSize = getInteger(e, "backup-window-size",
DEFAULT_BACKUP_WINDOW_SIZE, MINUS_ONE_OR_GT_ZERO);
-
started = true;
}
Modified:
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java
===================================================================
---
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java 2009-12-03
10:35:49 UTC (rev 8520)
+++
trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateReplicationSessionMessage.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -27,19 +27,15 @@
private long sessionChannelID;
- private int windowSize;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public CreateReplicationSessionMessage(final long sessionChannelID, final int
windowSize)
+ public CreateReplicationSessionMessage(final long sessionChannelID)
{
super(CREATE_REPLICATION);
this.sessionChannelID = sessionChannelID;
-
- this.windowSize = windowSize;
}
public CreateReplicationSessionMessage()
@@ -53,14 +49,12 @@
public void encodeRest(final HornetQBuffer buffer)
{
buffer.writeLong(sessionChannelID);
- buffer.writeInt(windowSize);
}
@Override
public void decodeRest(final HornetQBuffer buffer)
{
sessionChannelID = buffer.readLong();
- windowSize = buffer.readInt();
}
/**
@@ -71,14 +65,6 @@
return sessionChannelID;
}
- /**
- * @return the windowSize
- */
- public int getWindowSize()
- {
- return windowSize;
- }
-
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-03
10:35:49 UTC (rev 8520)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -173,8 +173,6 @@
response = new HornetQExceptionMessage((HornetQException)e);
}
- channel.confirm(packet);
-
channel.send(response);
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
---
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03
10:35:49 UTC (rev 8520)
+++
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -65,13 +65,11 @@
// Attributes ----------------------------------------------------
- private final int backupWindowSize;
-
private final ResponseHandler responseHandler = new ResponseHandler();
private final FailoverManager failoverManager;
- private RemotingConnection connection;
+ private RemotingConnection replicatingConnection;
private Channel replicatingChannel;
@@ -92,11 +90,10 @@
/**
* @param replicationConnectionManager
*/
- public ReplicationManagerImpl(final FailoverManager failoverManager, final
ExecutorFactory executorFactory, final int backupWindowSize)
+ public ReplicationManagerImpl(final FailoverManager failoverManager, final
ExecutorFactory executorFactory)
{
super();
- this.failoverManager = failoverManager;
- this.backupWindowSize = backupWindowSize;
+ this.failoverManager = failoverManager;
this.executorFactory = executorFactory;
}
@@ -307,25 +304,25 @@
{
throw new IllegalStateException("ReplicationManager is already
started");
}
- connection = failoverManager.getConnection();
+
+ replicatingConnection = failoverManager.getConnection();
- if (connection == null)
+ if (replicatingConnection == null)
{
log.warn("Backup server MUST be started before live server. Initialisation
will not proceed.");
throw new HornetQException(HornetQException.ILLEGAL_STATE,
"Backup server MUST be started before live
server. Initialisation will not proceed.");
}
- long channelID = connection.generateChannelID();
+ long channelID = replicatingConnection.generateChannelID();
- Channel mainChannel = connection.getChannel(1, -1);
+ Channel mainChannel = replicatingConnection.getChannel(1, -1);
- replicatingChannel = connection.getChannel(channelID, backupWindowSize);
+ replicatingChannel = replicatingConnection.getChannel(channelID, -1);
replicatingChannel.setHandler(responseHandler);
- CreateReplicationSessionMessage replicationStartPackage = new
CreateReplicationSessionMessage(channelID,
-
backupWindowSize);
+ CreateReplicationSessionMessage replicationStartPackage = new
CreateReplicationSessionMessage(channelID);
mainChannel.sendBlocking(replicationStartPackage);
@@ -333,7 +330,16 @@
{
public void connectionFailed(HornetQException me)
{
- log.warn("Connection to the backup node failed, removing replication
now", me);
+ if (me.getCode() == HornetQException.DISCONNECTED)
+ {
+ //Backup has shut down - no need to log a stack trace
+ log.warn("The backup node has been shut-down, replication will now
stop");
+ }
+ else
+ {
+ log.warn("Connection to the backup node failed, removing replication
now", me);
+ }
+
try
{
stop();
@@ -386,17 +392,15 @@
replicatingChannel.close();
}
- started = false;
-
failoverManager.causeExit();
- if (connection != null)
+ if (replicatingConnection != null)
{
- connection.destroy();
+ replicatingConnection.destroy();
}
- connection = null;
-
+ replicatingConnection = null;
+
started = false;
}
@@ -491,8 +495,6 @@
{
replicated();
}
-
- replicatingChannel.confirm(packet);
}
}
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-03
10:35:49 UTC (rev 8520)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQPacketHandler.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -192,7 +192,7 @@
try
{
- Channel channel = connection.getChannel(request.getSessionChannelID(),
request.getWindowSize());
+ Channel channel = connection.getChannel(request.getSessionChannelID(), -1);
ReplicationEndpoint endpoint = server.connectToReplicationEndpoint(channel);
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-03 10:35:49
UTC (rev 8520)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-12-03 10:43:37
UTC (rev 8521)
@@ -870,7 +870,7 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
- protected FailoverManagerImpl createBackupConnection(final TransportConfiguration
backupConnector,
+ protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
final ExecutorService
threadPool,
final ScheduledExecutorService
scheduledPool)
{
@@ -939,11 +939,10 @@
else
{
- replicationFailoverManager = createBackupConnection(backupConnector,
threadPool, scheduledPool);
+ replicationFailoverManager =
createBackupConnectionFailoverManager(backupConnector, threadPool, scheduledPool);
replicationManager = new ReplicationManagerImpl(replicationFailoverManager,
- executorFactory,
-
configuration.getBackupWindowSize());
+ executorFactory);
replicationManager.start();
}
}
Modified: trunk/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-03 10:35:49 UTC (rev
8520)
+++ trunk/tests/config/ConfigurationTest-full-config.xml 2009-12-03 10:43:37 UTC (rev
8521)
@@ -55,8 +55,7 @@
<class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor2</class-name>
</remoting-interceptors>
- <backup-connector-ref connector-name="backup-connector" />
- <backup-window-size>1024</backup-window-size>
+ <backup-connector-ref connector-name="backup-connector" />
<connectors>
<connector name="connector1">
<factory-class>org.hornetq.tests.unit.core.config.impl.TestConnectorFactory1</factory-class>
Modified: trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-03
10:35:49 UTC (rev 8520)
+++
trunk/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -119,8 +119,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
manager.stop();
}
@@ -146,8 +145,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
try
{
@@ -189,16 +187,14 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
try
{
ReplicationManagerImpl manager2 = new
ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager2.start();
fail("Exception was expected");
@@ -232,8 +228,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
try
{
@@ -270,8 +265,7 @@
StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -377,8 +371,7 @@
{
StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -541,8 +534,7 @@
try
{
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
fail("Exception expected");
}
@@ -569,8 +561,7 @@
{
StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
@@ -621,8 +612,7 @@
{
StorageManager storage = getStorage();
ReplicationManagerImpl manager = new ReplicationManagerImpl(failoverManager,
- this.factory,
-
ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE);
+ this.factory);
manager.start();
Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(),
manager);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-03
10:35:49 UTC (rev 8520)
+++
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/ConfigurationImplTest.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -98,7 +98,6 @@
assertEquals(ConfigurationImpl.DEFAULT_SERVER_DUMP_INTERVAL,
conf.getServerDumpInterval());
assertEquals(ConfigurationImpl.DEFAULT_MEMORY_WARNING_THRESHOLD,
conf.getMemoryWarningThreshold());
assertEquals(ConfigurationImpl.DEFAULT_MEMORY_MEASURE_INTERVAL,
conf.getMemoryMeasureInterval());
- assertEquals(ConfigurationImpl.DEFAULT_BACKUP_WINDOW_SIZE,
conf.getBackupWindowSize());
}
public void testSetGetAttributes()
@@ -306,10 +305,6 @@
conf.setTransactionTimeoutScanPeriod(l);
assertEquals(l, conf.getTransactionTimeoutScanPeriod());
- i = randomInt();
- conf.setBackupWindowSize(i);
- assertEquals(i, conf.getBackupWindowSize());
-
s = randomString();
conf.setManagementClusterPassword(s);
assertEquals(s, conf.getManagementClusterPassword());
@@ -534,10 +529,6 @@
s = randomString();
conf.setManagementClusterPassword(s);
assertEquals(s, conf.getManagementClusterPassword());
-
- i = randomInt();
- conf.setBackupWindowSize(i);
- assertEquals(i, conf.getBackupWindowSize());
ByteArrayOutputStream baos = new ByteArrayOutputStream();
ObjectOutputStream oos = new ObjectOutputStream(baos);
Modified:
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
---
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-03
10:35:49 UTC (rev 8520)
+++
trunk/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2009-12-03
10:43:37 UTC (rev 8521)
@@ -84,8 +84,7 @@
assertEquals("largemessagesdir", conf.getLargeMessagesDirectory());
assertEquals(95, conf.getMemoryWarningThreshold());
- assertEquals(1024, conf.getBackupWindowSize());
-
+
assertEquals(2, conf.getInterceptorClassNames().size());
assertTrue(conf.getInterceptorClassNames().contains("org.hornetq.tests.unit.core.config.impl.TestInterceptor1"));
assertTrue(conf.getInterceptorClassNames().contains("org.hornetq.tests.unit.core.config.impl.TestInterceptor2"));