[hornetq-commits] JBoss hornetq SVN: r9024 - in trunk: src/main/org/hornetq/core/message/impl and 5 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Mar 31 07:31:38 EDT 2010
Author: timfox
Date: 2010-03-31 07:31:37 -0400 (Wed, 31 Mar 2010)
New Revision: 9024
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
Log:
fixed default address optimisation
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -999,6 +999,9 @@
}
channel.setTransferring(false);
+
+ //Reset default address
+ defaultAddress = null;
}
catch (Throwable t)
{
Modified: trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/message/impl/MessageImpl.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -76,7 +76,7 @@
protected ResetLimitWrappedHornetQBuffer bodyBuffer;
- protected boolean bufferValid;
+ protected volatile boolean bufferValid;
private int endOfBodyPosition = -1;
@@ -244,7 +244,7 @@
return address;
}
- public void setAddress(final SimpleString address)
+ public synchronized void setAddress(final SimpleString address)
{
if (this.address != address)
{
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -548,7 +548,7 @@
}
SimpleString address = message.getAddress();
-
+
setPagingStore(message);
Object duplicateID = message.getObjectProperty(Message.HDR_DUPLICATE_DETECTION_ID);
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -411,7 +411,7 @@
// nodes could have same queue ids
// Note we must copy since same message may get routed to other nodes which require different headers
message = message.copy();
-
+
// TODO - we can optimise this
Set<SimpleString> propNames = new HashSet<SimpleString>(message.getPropertyNames());
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -911,10 +911,12 @@
{
long id = storageManager.generateUniqueID();
+ SimpleString address = message.getAddress();
+
message.setMessageID(id);
message.encodeMessageIDToBuffer();
-
- if (message.getAddress() == null)
+
+ if (address == null)
{
if (message.isDurable())
{
@@ -942,7 +944,7 @@
if (defaultAddress == null)
{
- defaultAddress = message.getAddress();
+ defaultAddress = address;
}
}
Modified: trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java
===================================================================
--- trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/src/main/org/hornetq/ra/inflow/HornetQActivation.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -95,7 +95,7 @@
private boolean isDeliveryTransacted;
private HornetQDestination destination;
-
+
/** The name of the temporary subscription name that all the sessions will share */
private SimpleString topicTemporaryQueue;
@@ -220,7 +220,6 @@
return isTopic;
}
-
/**
* Start the activation
*
@@ -235,7 +234,7 @@
deliveryActive.set(true);
ra.getWorkManager().scheduleWork(new SetupActivation());
}
-
+
/**
* @return the topicTemporaryQueue
*/
@@ -287,12 +286,25 @@
setupDestination();
for (int i = 0; i < spec.getMaxSessionInt(); i++)
{
- ClientSession session = setupSession();
+ ClientSession session = null;
- HornetQMessageHandler handler = new HornetQMessageHandler(this, session, i);
- handler.setup();
- session.start();
- handlers.add(handler);
+ try
+ {
+ session = setupSession();
+ HornetQMessageHandler handler = new HornetQMessageHandler(this, session, i);
+ handler.setup();
+ session.start();
+ handlers.add(handler);
+ }
+ catch (Exception e)
+ {
+ if (session != null)
+ {
+ session.close();
+ }
+
+ throw e;
+ }
}
HornetQActivation.log.info("Setup complete " + this);
@@ -426,11 +438,11 @@
// If there is no binding on naming, we will just create a new instance
if (isTopic)
{
- destination = (HornetQDestination) HornetQJMSClient.createTopic(destinationName.substring(destinationName.lastIndexOf('/') + 1));
+ destination = (HornetQDestination)HornetQJMSClient.createTopic(destinationName.substring(destinationName.lastIndexOf('/') + 1));
}
else
{
- destination = (HornetQDestination) HornetQJMSClient.createQueue(destinationName.substring(destinationName.lastIndexOf('/') + 1));
+ destination = (HornetQDestination)HornetQJMSClient.createQueue(destinationName.substring(destinationName.lastIndexOf('/') + 1));
}
}
}
@@ -452,11 +464,11 @@
{
if (Topic.class.getName().equals(spec.getDestinationType()))
{
- destination = (HornetQDestination) HornetQJMSClient.createTopic(spec.getDestination());
+ destination = (HornetQDestination)HornetQJMSClient.createTopic(spec.getDestination());
}
else
{
- destination = (HornetQDestination) HornetQJMSClient.createQueue(spec.getDestination());
+ destination = (HornetQDestination)HornetQJMSClient.createQueue(spec.getDestination());
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-03-31 10:38:37 UTC (rev 9023)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java 2010-03-31 11:31:37 UTC (rev 9024)
@@ -134,8 +134,6 @@
fail(session, latch);
- FailoverTest.log.info("got here 1");
-
ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
session.start();
@@ -158,7 +156,6 @@
}
}
- FailoverTest.log.info("closing session");
session.close();
Assert.assertEquals(0, sf.numSessions());
@@ -228,7 +225,6 @@
message.acknowledge();
}
- FailoverTest.log.info("closing session");
session.close();
Assert.assertEquals(0, sf.numSessions());
@@ -1166,7 +1162,6 @@
{
public void connectionFailed(final HornetQException me)
{
- FailoverTest.log.info("calling listener");
latch.countDown();
}
}
@@ -1224,8 +1219,6 @@
boolean ok = latch.await(1000, TimeUnit.MILLISECONDS);
- FailoverTest.log.info("waited for latch");
-
Assert.assertTrue(ok);
try
@@ -1976,24 +1969,19 @@
{
sf.addInterceptor(interceptor);
- FailoverTest.log.info("attempting commit");
session.commit();
}
catch (HornetQException e)
{
if (e.getCode() == HornetQException.TRANSACTION_ROLLED_BACK)
{
- FailoverTest.log.info("got transaction rolled back");
-
// Ok - now we retry the commit after removing the interceptor
sf.removeInterceptor(interceptor);
try
{
- FailoverTest.log.info("trying to commit again");
session.commit();
- FailoverTest.log.info("committed again ok");
failed = false;
}
@@ -2017,8 +2005,6 @@
fail(session, latch);
- FailoverTest.log.info("connection has failed");
-
committer.join();
Assert.assertFalse(committer.failed);
@@ -2270,7 +2256,6 @@
*/
protected void setBody(final int i, final ClientMessage message) throws Exception
{
- log.info("in failovertest:: setbody");
message.getBodyBuffer().writeString("message" + i);
}
More information about the hornetq-commits
mailing list