JBoss hornetq SVN: r9413 - in branches/2_2_0_HA_Improvements/src: main/org/hornetq/jms/server/impl and 1 other directory.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 09:35:56 -0400 (Mon, 19 Jul 2010)
New Revision: 9413
Modified:
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-jms.xsd
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
Log:
HA refactoring
* revert connectors' XML element to <connectors> (instead of <static-connectors>)
Modified: branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-jms.xsd
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-jms.xsd 2010-07-19 12:19:21 UTC (rev 9412)
+++ branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-jms.xsd 2010-07-19 13:35:56 UTC (rev 9413)
@@ -29,7 +29,7 @@
<xsd:element name="discovery-group-ref" type="discovery-group-refType" maxOccurs="1" minOccurs="0"></xsd:element>
<xsd:element name="discovery-initial-wait-timeout" type="xsd:long" maxOccurs="1" minOccurs="0"></xsd:element>
- <xsd:element name="static-connectors" maxOccurs="1" minOccurs="0">
+ <xsd:element name="connectors" maxOccurs="1" minOccurs="0">
<xsd:complexType>
<xsd:sequence>
<xsd:element name="connector-ref" type="connector-refType"
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2010-07-19 12:19:21 UTC (rev 9412)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerDeployer.java 2010-07-19 13:35:56 UTC (rev 9413)
@@ -44,7 +44,7 @@
protected static final String ENTRY_NODE_NAME = "entry";
- protected static final String CONNECTORS_NODE_NAME = "static-connectors";
+ protected static final String CONNECTORS_NODE_NAME = "connectors";
protected static final String CONNECTION_FACTORY_NODE_NAME = "connection-factory";
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-07-19 12:19:21 UTC (rev 9412)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2010-07-19 13:35:56 UTC (rev 9413)
@@ -1003,15 +1003,15 @@
}
else
{
+ TransportConfiguration[] connectorConfigs = (TransportConfiguration[])cfConfig.getConnectorConfigs().toArray(new TransportConfiguration[cfConfig.getConnectorConfigs().size()]);
+
if (cfConfig.isHA())
{
- cf = HornetQJMSClient.createConnectionFactoryWithHA(cfConfig.getConnectorConfigs()
- .toArray(new TransportConfiguration[0]));
+ cf = HornetQJMSClient.createConnectionFactoryWithHA(connectorConfigs);
}
else
{
- cf = HornetQJMSClient.createConnectionFactoryWithoutHA(cfConfig.getConnectorConfigs()
- .toArray(new TransportConfiguration[0]));
+ cf = HornetQJMSClient.createConnectionFactoryWithoutHA(connectorConfigs);
}
}
13 years, 11 months
JBoss hornetq SVN: r9412 - branches/2_2_0_HA_Improvements/examples/core/perf/src/org/hornetq/core/example.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 08:19:21 -0400 (Mon, 19 Jul 2010)
New Revision: 9412
Modified:
branches/2_2_0_HA_Improvements/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
Log:
HA refactoring
* fix core perf example to use ServerLocator API
Modified: branches/2_2_0_HA_Improvements/examples/core/perf/src/org/hornetq/core/example/PerfBase.java
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/perf/src/org/hornetq/core/example/PerfBase.java 2010-07-19 12:12:07 UTC (rev 9411)
+++ branches/2_2_0_HA_Improvements/examples/core/perf/src/org/hornetq/core/example/PerfBase.java 2010-07-19 12:19:21 UTC (rev 9412)
@@ -195,16 +195,17 @@
params.put(TransportConstants.HOST_PROP_NAME, perfParams.getHost());
params.put(TransportConstants.PORT_PROP_NAME, perfParams.getPort());
- factory = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName(), params));
- factory.setPreAcknowledge(perfParams.isPreAck());
- factory.setConfirmationWindowSize(perfParams.getConfirmationWindow());
- factory.setProducerWindowSize(perfParams.getProducerWindow());
- factory.setConsumerWindowSize(perfParams.getConsumerWindow());
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), params));
+ serverLocator.setPreAcknowledge(perfParams.isPreAck());
+ serverLocator.setConfirmationWindowSize(perfParams.getConfirmationWindow());
+ serverLocator.setProducerWindowSize(perfParams.getProducerWindow());
+ serverLocator.setConsumerWindowSize(perfParams.getConsumerWindow());
+ serverLocator.setAckBatchSize(perfParams.getBatchSize());
- factory.setAckBatchSize(perfParams.getBatchSize());
+ serverLocator.setBlockOnAcknowledge(perfParams.isBlockOnACK());
+ serverLocator.setBlockOnDurableSend(perfParams.isBlockOnPersistent());
+ factory = serverLocator.createSessionFactory();
- factory.setBlockOnAcknowledge(perfParams.isBlockOnACK());
- factory.setBlockOnDurableSend(perfParams.isBlockOnPersistent());
}
private void displayAverage(final long numberOfMessages, final long start, final long end)
@@ -293,11 +294,11 @@
}
finally
{
- if (session != null)
+ if (factory != null)
{
try
{
- session.close();
+ factory.close();
}
catch (Exception e)
{
13 years, 11 months
JBoss hornetq SVN: r9411 - in branches/2_2_0_HA_Improvements/examples/core: embedded/src/org/hornetq/core/example and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 08:12:07 -0400 (Mon, 19 Jul 2010)
New Revision: 9411
Modified:
branches/2_2_0_HA_Improvements/examples/core/embedded-remote/
branches/2_2_0_HA_Improvements/examples/core/embedded-remote/readme.html
branches/2_2_0_HA_Improvements/examples/core/embedded-remote/src/org/hornetq/core/example/EmbeddedRemoteExample.java
branches/2_2_0_HA_Improvements/examples/core/embedded/
branches/2_2_0_HA_Improvements/examples/core/embedded/readme.html
branches/2_2_0_HA_Improvements/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
branches/2_2_0_HA_Improvements/examples/core/microcontainer/
branches/2_2_0_HA_Improvements/examples/core/microcontainer/readme.html
branches/2_2_0_HA_Improvements/examples/core/microcontainer/src/org/hornetq/core/example/EmbeddedMicroContainerExample.java
Log:
HA refactoring
* fix core examples to use ServerLocator API
Property changes on: branches/2_2_0_HA_Improvements/examples/core/embedded
___________________________________________________________________
Name: svn:ignore
- logs
build
+ logs
build
data
Modified: branches/2_2_0_HA_Improvements/examples/core/embedded/readme.html
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/embedded/readme.html 2010-07-19 12:10:34 UTC (rev 9410)
+++ branches/2_2_0_HA_Improvements/examples/core/embedded/readme.html 2010-07-19 12:12:07 UTC (rev 9411)
@@ -39,7 +39,8 @@
<li>As we are not using a JNDI environment we instantiate the objects directly</li>
<pre class="prettyprint">
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
</pre>
<li>Create a Core Queue</li>
@@ -80,12 +81,9 @@
<li>Be sure to close our resources!</li>
<pre class="prettyprint">
- finally
+ if (sf != null)
{
- if (connection != null)
- {
- connection.close();
- }
+ sf.close();
}
</pre>
Modified: branches/2_2_0_HA_Improvements/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java 2010-07-19 12:10:34 UTC (rev 9410)
+++ branches/2_2_0_HA_Improvements/examples/core/embedded/src/org/hornetq/core/example/EmbeddedExample.java 2010-07-19 12:12:07 UTC (rev 9411)
@@ -13,10 +13,15 @@
package org.hornetq.core.example;
import java.util.Date;
-import java.util.HashMap;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -51,8 +56,9 @@
server.start();
// Step 3. As we are not using a JNDI environment we instantiate the objects directly
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(InVMConnectorFactory.class.getName()));
-
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(InVMConnectorFactory.class.getName()));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
+
// Step 4. Create a core queue
ClientSession coreSession = sf.createSession(false, false, false);
@@ -94,9 +100,9 @@
finally
{
// Step 9. Be sure to close our resources!
- if (session != null)
+ if (sf != null)
{
- session.close();
+ sf.close();
}
// Step 10. Stop the server
@@ -109,13 +115,4 @@
System.exit(-1);
}
}
-
- {
- HashMap<String, Object> map = new HashMap<String, Object>();
- map.put("host", "localhost");
- map.put("port", 5445);
- TransportConfiguration config = new TransportConfiguration(InVMConnectorFactory.class.getName(), map);
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(config);
- }
-
}
Property changes on: branches/2_2_0_HA_Improvements/examples/core/embedded-remote
___________________________________________________________________
Name: svn:ignore
- build
+ build
data
Modified: branches/2_2_0_HA_Improvements/examples/core/embedded-remote/readme.html
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/embedded-remote/readme.html 2010-07-19 12:10:34 UTC (rev 9410)
+++ branches/2_2_0_HA_Improvements/examples/core/embedded-remote/readme.html 2010-07-19 12:12:07 UTC (rev 9411)
@@ -52,7 +52,8 @@
<li>As we are not using a JNDI environment we instantiate the objects directly</li>
<pre class="prettyprint">
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
</pre>
<li>Create a Core Queue</li>
@@ -93,12 +94,9 @@
<li>Be sure to close our resources!</li>
<pre class="prettyprint">
- finally
+ if (sf != null)
{
- if (connection != null)
- {
- connection.close();
- }
+ sf.close();
}
</pre>
Modified: branches/2_2_0_HA_Improvements/examples/core/embedded-remote/src/org/hornetq/core/example/EmbeddedRemoteExample.java
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/embedded-remote/src/org/hornetq/core/example/EmbeddedRemoteExample.java 2010-07-19 12:10:34 UTC (rev 9410)
+++ branches/2_2_0_HA_Improvements/examples/core/embedded-remote/src/org/hornetq/core/example/EmbeddedRemoteExample.java 2010-07-19 12:12:07 UTC (rev 9411)
@@ -15,7 +15,13 @@
import java.util.Date;
import org.hornetq.api.core.TransportConfiguration;
-import org.hornetq.api.core.client.*;
+import org.hornetq.api.core.client.ClientConsumer;
+import org.hornetq.api.core.client.ClientMessage;
+import org.hornetq.api.core.client.ClientProducer;
+import org.hornetq.api.core.client.ClientSession;
+import org.hornetq.api.core.client.ClientSessionFactory;
+import org.hornetq.api.core.client.HornetQClient;
+import org.hornetq.api.core.client.ServerLocator;
import org.hornetq.common.example.SpawnedVMSupport;
import org.hornetq.core.remoting.impl.netty.NettyConnectorFactory;
@@ -42,7 +48,8 @@
process = EmbeddedRemoteExample.startRemoteEmbedded();
// Step 4. As we are not using a JNDI environment we instantiate the objects directly
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
// Step 5. Create a core queue
ClientSession coreSession = sf.createSession(false, false, false);
@@ -85,9 +92,9 @@
finally
{
// Step 10. Be sure to close our resources!
- if (session != null)
+ if (sf != null)
{
- session.close();
+ sf.close();
}
if (process != null)
Property changes on: branches/2_2_0_HA_Improvements/examples/core/microcontainer
___________________________________________________________________
Name: svn:ignore
- build
+ build
data
Modified: branches/2_2_0_HA_Improvements/examples/core/microcontainer/readme.html
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/microcontainer/readme.html 2010-07-19 12:10:34 UTC (rev 9410)
+++ branches/2_2_0_HA_Improvements/examples/core/microcontainer/readme.html 2010-07-19 12:12:07 UTC (rev 9411)
@@ -24,7 +24,8 @@
<li>As we are not using a JNDI environment we instantiate the objects directly</li>
<pre class="prettyprint">
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory (new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
</pre>
<li>Create a Core Queue</li>
@@ -65,12 +66,9 @@
<li>Be sure to close our resources!</li>
<pre class="prettyprint">
- finally
+ if (sf != null)
{
- if (connection != null)
- {
- connection.close();
- }
+ sf.close();
}
</pre>
Modified: branches/2_2_0_HA_Improvements/examples/core/microcontainer/src/org/hornetq/core/example/EmbeddedMicroContainerExample.java
===================================================================
--- branches/2_2_0_HA_Improvements/examples/core/microcontainer/src/org/hornetq/core/example/EmbeddedMicroContainerExample.java 2010-07-19 12:10:34 UTC (rev 9410)
+++ branches/2_2_0_HA_Improvements/examples/core/microcontainer/src/org/hornetq/core/example/EmbeddedMicroContainerExample.java 2010-07-19 12:12:07 UTC (rev 9411)
@@ -42,7 +42,8 @@
hornetQ.run();
// Step 2. As we are not using a JNDI environment we instantiate the objects directly
- ClientSessionFactory sf = HornetQClient.createClientSessionFactory(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ServerLocator serverLocator = HornetQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName()));
+ ClientSessionFactory sf = serverLocator.createSessionFactory();
// Step 3. Create a core queue
ClientSession coreSession = sf.createSession(false, false, false);
@@ -86,9 +87,9 @@
finally
{
// Step 8. Be sure to close our resources!
- if (session != null)
+ if (sf != null)
{
- session.close();
+ sf.close();
}
// Step 9. Shutdown the container
13 years, 11 months
JBoss hornetq SVN: r9410 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 08:10:34 -0400 (Mon, 19 Jul 2010)
New Revision: 9410
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
HA refactoring
* do not close the underlying connection until close() is called
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-19 12:09:09 UTC (rev 9409)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2010-07-19 12:10:34 UTC (rev 9410)
@@ -356,8 +356,6 @@
synchronized (failoverLock)
{
sessions.remove(session);
-
- checkCloseConnection();
}
}
}
@@ -394,6 +392,26 @@
return;
}
+ synchronized (createSessionLock)
+ {
+ synchronized (failoverLock)
+ {
+ for (ClientSession session : sessions)
+ {
+ try
+ {
+ session.close();
+ }
+ catch (HornetQException e)
+ {
+ log.warn("Unable to close session", e);
+ }
+ }
+
+ checkCloseConnection();
+ }
+ }
+
causeExit();
closed = true;
13 years, 11 months
JBoss hornetq SVN: r9409 - branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 08:09:09 -0400 (Mon, 19 Jul 2010)
New Revision: 9409
Modified:
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
Log:
HA refactoring
* do not announce the node if there is no cluster connections
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-19 09:13:55 UTC (rev 9408)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/server/cluster/impl/ClusterManagerImpl.java 2010-07-19 12:09:09 UTC (rev 9409)
@@ -152,8 +152,11 @@
// Now announce presence
- announceNode();
-
+ if (clusterConnections.size() > 0)
+ {
+ announceNode();
+ }
+
started = true;
}
@@ -264,7 +267,10 @@
{
backup = false;
- announceNode();
+ if (clusterConnections.size() > 0)
+ {
+ announceNode();
+ }
}
}
13 years, 11 months
JBoss hornetq SVN: r9408 - branches.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 05:13:55 -0400 (Mon, 19 Jul 2010)
New Revision: 9408
Removed:
branches/HORNETQ-129_STOMP_protocol/
Log:
removed STOMP integration branch as it was merged in the trunk
13 years, 11 months
JBoss hornetq SVN: r9407 - in trunk: src/main/org/hornetq/core/server/impl and 4 other directories.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-19 05:06:52 -0400 (Mon, 19 Jul 2010)
New Revision: 9407
Modified:
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-442: Out of Order delivery with depaging during a transaction
* ensure that during a tx completion, message routed to a an address which is depaging keep the transactional context
and remain in order
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -14,8 +14,6 @@
package org.hornetq.core.postoffice.impl;
import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -60,9 +58,9 @@
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
+import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
-import org.hornetq.core.transaction.Transaction.State;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.TypedProperties;
import org.hornetq.utils.UUIDGenerator;
@@ -1104,20 +1102,14 @@
private class PageMessageOperation implements TransactionOperation
{
private final List<ServerMessage> messagesToPage = new ArrayList<ServerMessage>();
-
+
+ private Transaction subTX = null;
+
void addMessageToPage(final ServerMessage message)
{
messagesToPage.add(message);
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
- */
- public Collection<Queue> getDistinctQueues()
- {
- return Collections.emptySet();
- }
-
public void afterCommit(final Transaction tx)
{
// If part of the transaction goes to the queue, and part goes to paging, we can't let depage start for the
@@ -1130,10 +1122,19 @@
{
pageTransaction.commit();
}
+
+ if (subTX != null)
+ {
+ subTX.afterCommit();
+ }
}
public void afterPrepare(final Transaction tx)
{
+ if (subTX != null)
+ {
+ subTX.afterPrepare();
+ }
}
public void afterRollback(final Transaction tx)
@@ -1144,6 +1145,11 @@
{
pageTransaction.rollback();
}
+
+ if (subTX != null)
+ {
+ subTX.afterRollback();
+ }
}
public void beforeCommit(final Transaction tx) throws Exception
@@ -1152,15 +1158,30 @@
{
pageMessages(tx);
}
+
+ if (subTX != null)
+ {
+ subTX.beforeCommit();
+ }
+
}
public void beforePrepare(final Transaction tx) throws Exception
{
pageMessages(tx);
+
+ if (subTX != null)
+ {
+ subTX.beforePrepare();
+ }
}
public void beforeRollback(final Transaction tx) throws Exception
{
+ if (subTX != null)
+ {
+ subTX.beforeRollback();
+ }
}
private void pageMessages(final Transaction tx) throws Exception
@@ -1201,9 +1222,13 @@
else
{
// This could happen when the PageStore left the pageState
-
- // TODO is this correct - don't we lose transactionality here???
- route(message, false);
+ // we create a copy of the transaction so that messages are routed with the same tx ID.
+ // but we can not use directly the tx as it has already its own set of TransactionOperations
+ if (subTX == null)
+ {
+ subTX = tx.copy();
+ }
+ route(message, subTX, false);
}
first = false;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -1385,7 +1385,7 @@
return status;
}
- private void postAcknowledge(final MessageReference ref) throws Exception
+ private void postAcknowledge(final MessageReference ref)
{
final ServerMessage message = ref.getMessage();
@@ -1423,7 +1423,14 @@
queue.deliveringCount.decrementAndGet();
- message.decrementRefCount();
+ try
+ {
+ message.decrementRefCount();
+ }
+ catch (Exception e)
+ {
+ QueueImpl.log.warn("Unable to decrement reference counting", e);
+ }
}
void postRollback(final LinkedList<MessageReference> refs)
@@ -1500,7 +1507,7 @@
}
}
- public void afterCommit(final Transaction tx) throws Exception
+ public void afterCommit(final Transaction tx)
{
for (MessageReference ref : refsToAck)
{
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -22,7 +22,7 @@
*
* @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor@jboss.org>Andy Taylor</a>
- */
+ */
public interface Transaction
{
void prepare() throws Exception;
@@ -32,6 +32,10 @@
void commit(boolean onePhase) throws Exception;
void rollback() throws Exception;
+
+ /** Used for pages during commit.
+ * When paging messages we need to guarantee that they are in the same transaction (but not with the same set of TransactionOperation). */
+ Transaction copy();
int getOperationsCount();
@@ -64,9 +68,24 @@
void setContainsPersistent();
void setTimeout(int timeout);
+
+ // To be used by sub-contexts. Mainly on paging
+
+ void beforeCommit() throws Exception;
+
+ void beforeRollback() throws Exception;
+
+ void beforePrepare() throws Exception;;
+
+ void afterPrepare();
+
+ void afterCommit();
+
+ void afterRollback();
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
}
+
}
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -24,13 +24,16 @@
{
void beforePrepare(Transaction tx) throws Exception;
+ /** After prepare shouldn't throw any exception. Any verification has to be done on before prepare */
+ void afterPrepare(Transaction tx);
+
void beforeCommit(Transaction tx) throws Exception;
+ /** After commit shouldn't throw any exception. Any verification has to be done on before commit */
+ void afterCommit(Transaction tx);
+
void beforeRollback(Transaction tx) throws Exception;
- void afterPrepare(Transaction tx) throws Exception;
-
- void afterCommit(Transaction tx) throws Exception;
-
- void afterRollback(Transaction tx) throws Exception;
+ /** After rollback shouldn't throw any exception. Any verification has to be done on before rollback */
+ void afterRollback(Transaction tx);
}
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -73,6 +73,20 @@
this.timeoutSeconds = timeoutSeconds;
}
+ /** Used for copying */
+ private TransactionImpl(final TransactionImpl other)
+ {
+ this.storageManager = other.storageManager;
+
+ this.xid = other.xid;
+
+ this.id = other.id;
+
+ this.createTime = other.createTime;
+
+ this.timeoutSeconds = other.timeoutSeconds;
+ }
+
public TransactionImpl(final StorageManager storageManager)
{
this.storageManager = storageManager;
@@ -130,7 +144,7 @@
{
return createTime;
}
-
+
public boolean hasTimedOut(final long currentTime,final int defaultTimeout)
{
if(timeoutSeconds == - 1)
@@ -169,13 +183,7 @@
throw new IllegalStateException("Cannot prepare non XA transaction");
}
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- operation.beforePrepare(this);
- }
- }
+ beforePrepare();
storageManager.prepare(id, xid);
@@ -195,22 +203,7 @@
public void done()
{
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterPrepare(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- TransactionImpl.log.warn(e.getMessage(), e);
- }
- }
- }
+ afterPrepare();
}
});
}
@@ -252,15 +245,9 @@
throw new IllegalStateException("Transaction is in invalid state " + state);
}
}
+
+ beforeCommit();
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- operation.beforeCommit(this);
- }
- }
-
if (containsPersistent || xid != null && state == State.PREPARED)
{
storageManager.commit(id);
@@ -285,22 +272,7 @@
public void done()
{
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterCommit(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- TransactionImpl.log.warn(e.getMessage(), e);
- }
- }
- }
+ afterCommit();
}
});
@@ -326,13 +298,7 @@
}
}
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- operation.beforeRollback(this);
- }
- }
+ beforeRollback();
doRollback();
@@ -353,22 +319,7 @@
public void done()
{
- if (operations != null)
- {
- for (TransactionOperation operation : operations)
- {
- try
- {
- operation.afterRollback(TransactionImpl.this);
- }
- catch (Exception e)
- {
- // https://jira.jboss.org/jira/browse/HORNETQ-188
- // After commit shouldn't throw an exception
- TransactionImpl.log.warn(e.getMessage(), e);
- }
- }
- }
+ afterRollback();
}
});
}
@@ -471,4 +422,75 @@
}
}
+ public Transaction copy()
+ {
+ return new TransactionImpl(this);
+ }
+
+ public void afterCommit()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.afterCommit(this);
+ }
+ }
+ }
+
+ public void afterRollback()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.afterRollback(this);
+ }
+ }
+ }
+
+ public void beforeCommit() throws Exception
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.beforeCommit(this);
+ }
+ }
+ }
+
+ public void beforePrepare() throws Exception
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.beforePrepare(this);
+ }
+ }
+ }
+
+ public void beforeRollback() throws Exception
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.beforeRollback(this);
+ }
+ }
+ }
+
+ public void afterPrepare()
+ {
+ if (operations != null)
+ {
+ for (TransactionOperation operation : operations)
+ {
+ operation.afterPrepare(this);
+ }
+ }
+ }
+
}
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -247,7 +247,7 @@
server.start();
- final int numberOfIntegers = 256;
+ final int messageSize = 1024; // 1k
try
{
@@ -263,7 +263,7 @@
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
- byte[] body = new byte[DataConstants.SIZE_INT * numberOfIntegers];
+ byte[] body = new byte[messageSize];
// HornetQBuffer bodyLocal = HornetQChannelBuffers.buffer(DataConstants.SIZE_INT * numberOfIntegers);
ClientMessage message = null;
@@ -372,7 +372,7 @@
* Test under discussion at : http://community.jboss.org/thread/154061?tstart=0
*
*/
- public void disabled_testDepageDuringTransaction2() throws Exception
+ public void testDepageDuringTransaction2() throws Exception
{
boolean IS_DURABLE_MESSAGE = true;
clearData();
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-07-16 11:12:45 UTC (rev 9406)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2010-07-19 09:06:52 UTC (rev 9407)
@@ -292,6 +292,35 @@
}
+ public Transaction copy()
+ {
+ return null;
+ }
+
+ public void afterCommit()
+ {
+ }
+
+ public void afterPrepare()
+ {
+ }
+
+ public void afterRollback()
+ {
+ }
+
+ public void beforeCommit() throws Exception
+ {
+ }
+
+ public void beforePrepare() throws Exception
+ {
+ }
+
+ public void beforeRollback() throws Exception
+ {
+ }
+
}
class FakeMessage implements ServerMessage
13 years, 11 months
JBoss hornetq SVN: r9406 - in branches/2_2_0_HA_Improvements: src/main/org/hornetq/core/deployers/impl and 2 other directories.
by do-not-reply@jboss.org
Author: ataylor
Date: 2010-07-16 07:12:45 -0400 (Fri, 16 Jul 2010)
New Revision: 9406
Modified:
branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
branches/2_2_0_HA_Improvements/tests/config/ConfigurationTest-full-config.xml
branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
Log:
fixed test full config and config tests
Modified: branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd
===================================================================
--- branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-07-16 08:13:34 UTC (rev 9405)
+++ branches/2_2_0_HA_Improvements/src/config/common/schema/hornetq-configuration.xsd 2010-07-16 11:12:45 UTC (rev 9406)
@@ -213,7 +213,7 @@
</xsd:element>
<xsd:element maxOccurs="1" minOccurs="0" ref="broadcast-period">
</xsd:element>
- <xsd:element maxOccurs="unbounded" minOccurs="0" ref="connector-ref">
+ <xsd:element maxOccurs="unbounded" minOccurs="0" name="connector-ref" type="xsd:string">
</xsd:element>
</xsd:sequence>
<xsd:attribute name="name" type="xsd:ID" use="required"/>
Modified: branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java
===================================================================
--- branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-07-16 08:13:34 UTC (rev 9405)
+++ branches/2_2_0_HA_Improvements/src/main/org/hornetq/core/deployers/impl/FileConfigurationParser.java 2010-07-16 11:12:45 UTC (rev 9406)
@@ -879,7 +879,7 @@
if (child.getNodeName().equals("connector-ref"))
{
- String connectorName = child.getAttributes().getNamedItem("connector-name").getNodeValue();
+ String connectorName = XMLConfigurationUtil.getString(e, "connector-ref", null, Validators.NOT_NULL_OR_EMPTY);
connectorNames.add(connectorName);
}
@@ -977,19 +977,7 @@
}
else if (child.getNodeName().equals("static-connectors"))
{
- NodeList children2 = child.getChildNodes();
-
- for (int k = 0; k < children2.getLength(); k++)
- {
- Node child2 = children.item(k);
-
- if (child2.getNodeName().equals("connector-ref"))
- {
- String connName = child.getAttributes().getNamedItem("connector-name").getNodeValue();
-
- staticConnectorNames.add(connName);
- }
- }
+ getStaticConnectors(staticConnectorNames, child);
}
}
@@ -1115,19 +1103,7 @@
}
else if (child.getNodeName().equals("static-connectors"))
{
- NodeList children2 = child.getChildNodes();
-
- for (int k = 0; k < children2.getLength(); k++)
- {
- Node child2 = children.item(k);
-
- if (child2.getNodeName().equals("connector-ref"))
- {
- String connectorName = child.getAttributes().getNamedItem("connector-name").getNodeValue();
-
- staticConnectorNames.add(connectorName);
- }
- }
+ getStaticConnectors(staticConnectorNames, child);
}
}
@@ -1175,6 +1151,20 @@
mainConfig.getBridgeConfigurations().add(config);
}
+ private void getStaticConnectors(List<String> staticConnectorNames, Node child)
+ {
+ NodeList children2 = ((Element) child).getElementsByTagName("connector-ref");
+
+ for (int k = 0; k < children2.getLength(); k++)
+ {
+ Element child2 = (Element) children2.item(k);
+
+ String connectorName = child2.getChildNodes().item(0).getNodeValue();
+
+ staticConnectorNames.add(connectorName);
+ }
+ }
+
private void parseDivertConfiguration(final Element e, final Configuration mainConfig)
{
String name = e.getAttribute("name");
Modified: branches/2_2_0_HA_Improvements/tests/config/ConfigurationTest-full-config.xml
===================================================================
--- branches/2_2_0_HA_Improvements/tests/config/ConfigurationTest-full-config.xml 2010-07-16 08:13:34 UTC (rev 9405)
+++ branches/2_2_0_HA_Improvements/tests/config/ConfigurationTest-full-config.xml 2010-07-16 11:12:45 UTC (rev 9406)
@@ -1,7 +1,7 @@
<configuration
xmlns="urn:hornetq"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="urn:hornetq ../../src/schemas/hornetq-configuration.xsd">
+ xsi:schemaLocation="urn:hornetq ../../src/config/common/schema/hornetq-configuration.xsd">
<clustered>true</clustered>
<scheduled-thread-pool-max-size>12345</scheduled-thread-pool-max-size>
<thread-pool-max-size>54321</thread-pool-max-size>
@@ -53,8 +53,7 @@
<class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor1</class-name>
<class-name>org.hornetq.tests.unit.core.config.impl.TestInterceptor2</class-name>
</remoting-interceptors>
-
- <backup-connector-ref connector-name="backup-connector" />
+
<connectors>
<connector name="connector1">
<factory-class>org.hornetq.tests.unit.core.config.impl.TestConnectorFactory1</factory-class>
@@ -68,11 +67,6 @@
<param key="b1" value="w1"/>
<param key="b2" value="234"/>
</connector>
- <connector name="backup-connector">
- <factory-class>org.hornetq.tests.unit.core.config.impl.TestConnectorFactory3</factory-class>
- <param key="c1" value="x1"/>
- <param key="c2" value="345"/>
- </connector>
</connectors>
<acceptors>
<acceptor name="acceptor1">
@@ -92,14 +86,14 @@
<group-address>192.168.0.120</group-address>
<group-port>11999</group-port>
<broadcast-period>12345</broadcast-period>
- <connector-ref connector-name="connector1"/>
+ <connector-ref>connector1</connector-ref>
</broadcast-group>
<broadcast-group name="bg2">
<local-bind-port>12999</local-bind-port>
<group-address>192.168.0.121</group-address>
<group-port>13999</group-port>
<broadcast-period>23456</broadcast-period>
- <connector-ref connector-name="connector2" backup-connector-name="backup-connector"/>
+ <connector-ref>connector2</connector-ref>
</broadcast-group>
</broadcast-groups>
<discovery-groups>
@@ -157,7 +151,9 @@
<reconnect-attempts>2</reconnect-attempts>
<failover-on-server-shutdown>false</failover-on-server-shutdown>
<use-duplicate-detection>true</use-duplicate-detection>
- <connector-ref connector-name="connector1"/>
+ <static-connectors>
+ <connector-ref>connector1</connector-ref>
+ </static-connectors>
</bridge>
<bridge name="bridge2">
<queue-name>queue2</queue-name>
@@ -168,15 +164,19 @@
<cluster-connections>
<cluster-connection name="cluster-connection1">
<address>queues1</address>
+ <connector-ref>connector1</connector-ref>
<retry-interval>3</retry-interval>
<use-duplicate-detection>true</use-duplicate-detection>
<forward-when-no-consumers>false</forward-when-no-consumers>
<max-hops>1</max-hops>
- <connector-ref connector-name="connector1" backup-connector-name="backup-connector"/>
- <connector-ref connector-name="connector2"/>
+ <static-connectors>
+ <connector-ref>connector1</connector-ref>
+ <connector-ref>connector2</connector-ref>
+ </static-connectors>
</cluster-connection>
<cluster-connection name="cluster-connection2">
<address>queues2</address>
+ <connector-ref>connector2</connector-ref>
<retry-interval>4</retry-interval>
<use-duplicate-detection>false</use-duplicate-detection>
<forward-when-no-consumers>true</forward-when-no-consumers>
Modified: branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java
===================================================================
--- branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2010-07-16 08:13:34 UTC (rev 9405)
+++ branches/2_2_0_HA_Improvements/tests/src/org/hornetq/tests/unit/core/config/impl/FileConfigurationTest.java 2010-07-16 11:12:45 UTC (rev 9406)
@@ -93,7 +93,7 @@
.contains("org.hornetq.tests.unit.core.config.impl.TestInterceptor2"));
- Assert.assertEquals(3, conf.getConnectorConfigurations().size());
+ Assert.assertEquals(2, conf.getConnectorConfigurations().size());
TransportConfiguration tc = conf.getConnectorConfigurations().get("connector1");
Assert.assertNotNull(tc);
@@ -109,12 +109,6 @@
Assert.assertEquals("w1", tc.getParams().get("b1"));
Assert.assertEquals("234", tc.getParams().get("b2"));
- tc = conf.getConnectorConfigurations().get("backup-connector");
- Assert.assertNotNull(tc);
- Assert.assertEquals("org.hornetq.tests.unit.core.config.impl.TestConnectorFactory3", tc.getFactoryClassName());
- Assert.assertEquals("x1", tc.getParams().get("c1"));
- Assert.assertEquals("345", tc.getParams().get("c2"));
-
Assert.assertEquals(2, conf.getAcceptorConfigurations().size());
for (TransportConfiguration ac : conf.getAcceptorConfigurations())
{
@@ -239,7 +233,6 @@
Assert.assertEquals(false, ccc.isForwardWhenNoConsumers());
Assert.assertEquals(1, ccc.getMaxHops());
Assert.assertEquals("connector1", ccc.getStaticConnectors().get(0));
- Assert.assertEquals("backup-connector", ccc.getStaticConnectors().get(1));
Assert.assertEquals("connector2", ccc.getStaticConnectors().get(1));
Assert.assertEquals(null, ccc.getDiscoveryGroupName());
}
13 years, 11 months
JBoss hornetq SVN: r9405 - trunk/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2010-07-16 04:13:34 -0400 (Fri, 16 Jul 2010)
New Revision: 9405
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
fix PagingTest disabled_testDepageDuringTransaction2
* message #0 was sent before the queue was created and would not be consumed
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-16 04:47:08 UTC (rev 9404)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2010-07-16 08:13:34 UTC (rev 9405)
@@ -400,22 +400,19 @@
byte[] body = new byte[messageSize];
ClientSession sessionTransacted = sf.createSession(null, null, false, false, false, false, 0);
-
ClientProducer producerTransacted = sessionTransacted.createProducer(PagingTest.ADDRESS);
-
+
+ ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
+ session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
+
ClientMessage firstMessage = sessionTransacted.createMessage(IS_DURABLE_MESSAGE);
firstMessage.getBodyBuffer().writeBytes(body);
firstMessage.putIntProperty(new SimpleString("id"), 0);
producerTransacted.send(firstMessage);
- ClientSession session = sf.createSession(null, null, false, true, true, false, 0);
-
- session.createQueue(PagingTest.ADDRESS, PagingTest.ADDRESS, null, true);
-
ClientProducer producer = session.createProducer(PagingTest.ADDRESS);
-
ClientMessage message = null;
int numberOfMessages = 0;
@@ -482,8 +479,9 @@
Integer messageID = (Integer)message.getObjectProperty(new SimpleString("id"));
+// System.out.println(messageID);
Assert.assertNotNull(messageID);
- Assert.assertEquals("message received out of order", messageID.intValue(), i);
+ Assert.assertEquals("message received out of order", i, messageID.intValue());
message.acknowledge();
}
13 years, 11 months
JBoss hornetq SVN: r9404 - in trunk: tests/src/org/hornetq/tests/integration/journal and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2010-07-16 00:47:08 -0400 (Fri, 16 Jul 2010)
New Revision: 9404
Modified:
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
https://jira.jboss.org/browse/HORNETQ-440 - Fix on the journal
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-15 18:23:17 UTC (rev 9403)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2010-07-16 04:47:08 UTC (rev 9404)
@@ -904,15 +904,24 @@
try
{
- JournalRecord record = records.remove(id);
+ JournalRecord record = null;
+
+ if (compactor == null)
+ {
+ record = records.remove(id);
- if (record == null)
- {
- if (!(compactor != null && compactor.lookupRecord(id)))
+ if (record == null)
{
throw new IllegalStateException("Cannot find add info " + id);
}
}
+ else
+ {
+ if (!records.containsKey(id) && !compactor.lookupRecord(id))
+ {
+ throw new IllegalStateException("Cannot find add info " + id + " on compactor or current records");
+ }
+ }
JournalInternalRecord deleteRecord = new JournalDeleteRecord(id);
@@ -1464,7 +1473,7 @@
return;
}
- autoReclaim = false;
+ setAutoReclaim(false);
// We need to move to the next file, as we need a clear start for negatives and positives counts
moveNextFile(true);
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-07-15 18:23:17 UTC (rev 9403)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalTransaction.java 2010-07-16 04:47:08 UTC (rev 9404)
@@ -317,18 +317,18 @@
{
for (JournalUpdate trDelete : neg)
{
- JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
-
- if (posFiles != null)
+ if (compactor != null)
{
- posFiles.delete(trDelete.file);
+ compactor.addCommandDelete(trDelete.id, trDelete.file);
}
- else if (compactor != null && compactor.lookupRecord(trDelete.id))
+ else
{
- // This is a case where the transaction was opened after compacting was started,
- // but the commit arrived while compacting was working
- // We need to cache the counter update, so compacting will take the correct files when it is done
- compactor.addCommandDelete(trDelete.id, trDelete.file);
+ JournalImpl.JournalRecord posFiles = journal.getRecords().remove(trDelete.id);
+
+ if (posFiles != null)
+ {
+ posFiles.delete(trDelete.file);
+ }
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-15 18:23:17 UTC (rev 9403)
+++ trunk/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2010-07-16 04:47:08 UTC (rev 9404)
@@ -563,8 +563,7 @@
}
- // This test is under investigation... disabled for now
- public void _testCompactAddAndUpdateFollowedByADelete() throws Exception
+ public void testCompactAddAndUpdateFollowedByADelete() throws Exception
{
setup(2, 60 * 1024, false);
@@ -601,15 +600,210 @@
startJournal();
load();
+
+ long consumerTX = idGen.generateID();
long firstID = idGen.generateID();
+
+ long appendTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
+ addTx(consumerTX, firstID);
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+ tCompact.start();
+
+
+ reusableLatchDone.waitCompletion();
+
+ addTx(appendTX, addedRecord);
+
+ commit(appendTX);
+
+ updateTx(consumerTX, addedRecord);
+
+ commit(consumerTX);
+
+ delete(addedRecord);
+
+ reusableLatchWait.down();
+
+ tCompact.join();
+
+ journal.forceMoveNextFile();
+
+ long newRecord = idGen.generateID();
+ add(newRecord);
+ update(newRecord);
+
+ journal.compact();
+
+ System.out.println("Debug after compact\n" + journal.debug());
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactAddAndUpdateFollowedByADelete2() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long firstID = idGen.generateID();
+
long consumerTX = idGen.generateID();
long appendTX = idGen.generateID();
long addedRecord = idGen.generateID();
+ addTx(consumerTX, firstID);
+
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+
+ tCompact.start();
+
+ reusableLatchDone.waitCompletion();
+
+ addTx(appendTX, addedRecord);
+ commit(appendTX);
+ updateTx(consumerTX, addedRecord);
+ commit(consumerTX);
+
+ long deleteTXID = idGen.generateID();
+
+ deleteTx(deleteTXID, addedRecord);
+
+ commit(deleteTXID);
+
+ reusableLatchWait.down();
+
+ tCompact.join();
+
+ journal.forceMoveNextFile();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+ public void testCompactAddAndUpdateFollowedByADelete3() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long firstID = idGen.generateID();
+
+ long consumerTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
add(firstID);
updateTx(consumerTX, firstID);
@@ -637,11 +831,102 @@
reusableLatchDone.waitCompletion();
+ addTx(consumerTX, addedRecord);
+ commit(consumerTX);
+ delete(addedRecord);
+
+ reusableLatchWait.down();
+
+ tCompact.join();
+
+ journal.compact();
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
+ }
+
+
+ public void testCompactAddAndUpdateFollowedByADelete4() throws Exception
+ {
+
+ setup(2, 60 * 1024, false);
+
+ SimpleIDGenerator idGen = new SimpleIDGenerator(1000);
+
+
+ final VariableLatch reusableLatchDone = new VariableLatch();
+ reusableLatchDone.up();
+ final VariableLatch reusableLatchWait = new VariableLatch();
+ reusableLatchWait.up();
+
+ journal = new JournalImpl(fileSize, minFiles, 0, 0, fileFactory, filePrefix, fileExtension, maxAIO)
+ {
+
+ @Override
+ public void onCompactDone()
+ {
+ reusableLatchDone.down();
+ System.out.println("Waiting on Compact");
+ try
+ {
+ reusableLatchWait.waitCompletion();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ System.out.println("Done");
+ }
+ };
+
+ journal.setAutoReclaim(false);
+
+ startJournal();
+ load();
+
+ long consumerTX = idGen.generateID();
+
+ long firstID = idGen.generateID();
+
+ long appendTX = idGen.generateID();
+
+ long addedRecord = idGen.generateID();
+
+ Thread tCompact = new Thread()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ journal.compact();
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ };
+
+
+ tCompact.start();
+
+
+ reusableLatchDone.waitCompletion();
+
+ addTx(consumerTX, firstID);
+
addTx(appendTX, addedRecord);
- updateTx(appendTX, addedRecord);
+
commit(appendTX);
+
updateTx(consumerTX, addedRecord);
+
commit(consumerTX);
+
delete(addedRecord);
reusableLatchWait.down();
@@ -656,6 +941,8 @@
journal.compact();
+ System.out.println("Debug after compact\n" + journal.debug());
+
stopJournal();
createJournal();
startJournal();
13 years, 11 months