Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 09:05:00 -0500 (Thu, 03 Dec 2009)
New Revision: 8524
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
Log:
Removing waitOnCompletion at QueueImpl + reverting commit on SessionFactory.
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-03
14:02:04 UTC (rev 8523)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-03
14:05:00 UTC (rev 8524)
@@ -264,54 +264,58 @@
}
}
- private void initialise() throws Exception
+ private synchronized void initialise() throws Exception
{
- setThreadPools();
+ if (!readOnly)
+ {
+ readOnly = true;
+ setThreadPools();
- instantiateLoadBalancingPolicy();
+ instantiateLoadBalancingPolicy();
- if (discoveryAddress != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+ if (discoveryAddress != null)
+ {
+ InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
- discoveryGroup = new
DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
- discoveryAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
+ discoveryGroup = new
DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
- discoveryGroup.registerListener(this);
+ discoveryGroup.registerListener(this);
- discoveryGroup.start();
- }
- else if (staticConnectors != null)
- {
- for (Pair<TransportConfiguration, TransportConfiguration> pair :
staticConnectors)
+ discoveryGroup.start();
+ }
+ else if (staticConnectors != null)
{
- FailoverManager cm = new FailoverManagerImpl(this,
- pair.a,
- pair.b,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ for (Pair<TransportConfiguration, TransportConfiguration> pair :
staticConnectors)
+ {
+ FailoverManager cm = new FailoverManagerImpl(this,
+ pair.a,
+ pair.b,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
- failoverManagerMap.put(pair, cm);
+ failoverManagerMap.put(pair, cm);
+ }
+
+ updatefailoverManagerArray();
}
-
- updatefailoverManagerArray();
+ else
+ {
+ throw new IllegalStateException("Before using a session factory you must
either set discovery address and port or " + "provide some static transport
configuration");
+ }
}
- else
- {
- throw new IllegalStateException("Before using a session factory you must
either set discovery address and port or " + "provide some static transport
configuration");
- }
}
// Static
@@ -1077,7 +1081,7 @@
}
}
- private synchronized ClientSession createSessionInternal(final String username,
+ private ClientSession createSessionInternal(final String username,
final String password,
final boolean xa,
final boolean autoCommitSends,
@@ -1100,8 +1104,6 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to
initialise session factory", e);
}
-
- readOnly = true;
}
if (discoveryGroup != null && !receivedBroadcast)
@@ -1115,32 +1117,35 @@
}
}
- int pos = loadBalancingPolicy.select(failoverManagerArray.length);
+ synchronized (this)
+ {
+ int pos = loadBalancingPolicy.select(failoverManagerArray.length);
- FailoverManager failoverManager = failoverManagerArray[pos];
+ FailoverManager failoverManager = failoverManagerArray[pos];
- ClientSession session = failoverManager.createSession(username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- ackBatchSize,
- cacheLargeMessagesClient,
- minLargeMessageSize,
- blockOnAcknowledge,
- autoGroup,
- confirmationWindowSize,
- producerWindowSize,
- consumerWindowSize,
- producerMaxRate,
- consumerMaxRate,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- initialMessagePacketSize,
- groupID);
+ ClientSession session = failoverManager.createSession(username,
+ password,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ ackBatchSize,
+ cacheLargeMessagesClient,
+ minLargeMessageSize,
+ blockOnAcknowledge,
+ autoGroup,
+ confirmationWindowSize,
+ producerWindowSize,
+ consumerWindowSize,
+ producerMaxRate,
+ consumerMaxRate,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ initialMessagePacketSize,
+ groupID);
- return session;
+ return session;
+ }
}
private void instantiateLoadBalancingPolicy()
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-03 14:02:04 UTC
(rev 8523)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-12-03 14:05:00 UTC
(rev 8524)
@@ -922,7 +922,6 @@
if (message.isDurable() && durable)
{
storageManager.updateDeliveryCount(reference);
- storageManager.waitOnOperations();
}
AddressSettings addressSettings =
addressSettingsRepository.getMatch(address.toString());
@@ -932,7 +931,6 @@
if (maxDeliveries > 0 && reference.getDeliveryCount() >=
maxDeliveries)
{
sendToDeadLetterAddress(reference);
- storageManager.waitOnOperations();
return false;
}