JBoss hornetq SVN: r8516 - trunk/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: jmesnil
Date: 2009-12-03 03:29:30 -0500 (Thu, 03 Dec 2009)
New Revision: 8516
Modified:
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
Log:
factorized ServerSessionImpl.getCreditManagerHolder() methods
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-03 07:25:42 UTC (rev 8515)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-12-03 08:29:30 UTC (rev 8516)
@@ -23,8 +23,6 @@
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
-import java.util.concurrent.Executors;
-import java.util.concurrent.LinkedBlockingQueue;
import javax.transaction.xa.XAException;
import javax.transaction.xa.XAResource;
@@ -1901,29 +1899,13 @@
*/
private void releaseOutStanding(final ServerMessage message, final int credits) throws Exception
{
- CreditManagerHolder holder = getCreditManagerHolder(message);
+ CreditManagerHolder holder = getCreditManagerHolder(message.getDestination());
holder.outstandingCredits -= credits;
holder.store.returnProducerCredits(credits);
}
- private CreditManagerHolder getCreditManagerHolder(final ServerMessage message) throws Exception
- {
- SimpleString address = message.getDestination();
-
- CreditManagerHolder holder = creditManagerHolders.get(address);
-
- if (holder == null)
- {
- holder = new CreditManagerHolder(message.getPagingStore());
-
- creditManagerHolders.put(address, holder);
- }
-
- return holder;
- }
-
private CreditManagerHolder getCreditManagerHolder(final SimpleString address) throws Exception
{
CreditManagerHolder holder = creditManagerHolders.get(address);
15 years, 1 month
JBoss hornetq SVN: r8515 - in trunk/tests/src/org/hornetq/tests: util and 1 other directory.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 02:25:42 -0500 (Thu, 03 Dec 2009)
New Revision: 8515
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
tweaks / fixing compilation
Modified: trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java 2009-12-03 07:22:04 UTC (rev 8514)
+++ trunk/tests/src/org/hornetq/tests/integration/client/NettyConsumerWindowSizeTest.java 2009-12-03 07:25:42 UTC (rev 8515)
@@ -13,14 +13,7 @@
package org.hornetq.tests.integration.client;
-import java.util.HashMap;
-import org.hornetq.core.client.ClientSessionFactory;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.integration.transports.netty.TransportConstants;
-
/**
* A NettyConsumerWindowSizeTest
*
@@ -40,39 +33,6 @@
return true;
}
- protected ClientSessionFactory createNettyFactory()
- {
- HashMap<String, Object> parameters = new HashMap<String, Object>();
-
- parameters.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
-
- TransportConfiguration config = new TransportConfiguration(NETTY_CONNECTOR_FACTORY, parameters);
-
- return new ClientSessionFactoryImpl(config);
-
- //return super.createNettyFactory();
- }
-
- protected Configuration createDefaultConfig(final boolean netty)
- {
- if (netty)
- {
-
- HashMap<String, Object> parameters = new HashMap<String, Object>();
-
- parameters.put(TransportConstants.TCP_NODELAY_PROPNAME, true);
-
- return createDefaultConfig(parameters, INVM_ACCEPTOR_FACTORY, NETTY_ACCEPTOR_FACTORY);
- }
- else
- {
- new Exception("This test wasn't supposed to use InVM").printStackTrace();
- return super.createDefaultConfig(false);
- }
- }
-
-
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-12-03 07:22:04 UTC (rev 8514)
+++ trunk/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-12-03 07:25:42 UTC (rev 8515)
@@ -318,7 +318,7 @@
return configuration;
}
- protected ClientSessionFactory createFactory(boolean isNetty)
+ protected ClientSessionFactoryImpl createFactory(boolean isNetty)
{
if (isNetty)
{
@@ -330,17 +330,17 @@
}
}
- protected ClientSessionFactory createInVMFactory()
+ protected ClientSessionFactoryImpl createInVMFactory()
{
return createFactory(INVM_CONNECTOR_FACTORY);
}
- protected ClientSessionFactory createNettyFactory()
+ protected ClientSessionFactoryImpl createNettyFactory()
{
return createFactory(NETTY_CONNECTOR_FACTORY);
}
- protected ClientSessionFactory createFactory(final String connectorClass)
+ protected ClientSessionFactoryImpl createFactory(final String connectorClass)
{
return new ClientSessionFactoryImpl(new TransportConfiguration(connectorClass), null);
15 years, 1 month
JBoss hornetq SVN: r8514 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 02:22:04 -0500 (Thu, 03 Dec 2009)
New Revision: 8514
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
Log:
Adding test I used to capture multi-thread issue with ClientSessionFactory
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03 07:06:15 UTC (rev 8513)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/ReattachTest.java 2009-12-03 07:22:04 UTC (rev 8514)
@@ -13,26 +13,25 @@
package org.hornetq.tests.integration.cluster.reattach;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.CountDownLatch;
+
import org.hornetq.core.client.ClientConsumer;
import org.hornetq.core.client.ClientMessage;
import org.hornetq.core.client.ClientProducer;
import org.hornetq.core.client.ClientSession;
import org.hornetq.core.client.SessionFailureListener;
-import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ClientSessionFactoryInternal;
import org.hornetq.core.client.impl.ClientSessionInternal;
-import org.hornetq.core.config.Configuration;
-import org.hornetq.core.config.TransportConfiguration;
-import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.RemotingConnection;
import org.hornetq.core.remoting.impl.invm.InVMConnector;
import org.hornetq.core.remoting.impl.invm.InVMRegistry;
-import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
-import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.SimpleString;
/**
@@ -45,7 +44,7 @@
*
*
*/
-public class ReattachTest extends UnitTestCase
+public class ReattachTest extends ServiceTestBase
{
private static final Logger log = Logger.getLogger(ReattachTest.class);
@@ -74,7 +73,7 @@
final int reconnectAttempts = 1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -152,7 +151,7 @@
final int reconnectAttempts = -1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -242,7 +241,7 @@
final long asyncFailDelay = 2000;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -261,9 +260,9 @@
{
failed = true;
}
-
+
public void beforeReconnect(HornetQException exception)
- {
+ {
}
}
@@ -356,7 +355,7 @@
final int reconnectAttempts = 3;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -411,12 +410,12 @@
conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
- //Should throw exception since didn't reconnect
-
+ // Should throw exception since didn't reconnect
+
try
{
session.start();
-
+
fail("Should throw exception");
}
catch (HornetQException e)
@@ -431,6 +430,299 @@
t.join();
}
+ public void testCreateSessionFailAfterSendSeveralThreads() throws Throwable
+ {
+
+ Timer timer = new Timer();
+ ClientSession session = null;
+
+ try
+ {
+
+ final long retryInterval = 50;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = -1;
+
+ final ClientSessionFactoryInternal sf = createFactory(false);
+
+
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setConfirmationWindowSize(1024 * 1024);
+
+
+ session = sf.createSession();
+
+ final RemotingConnection connFailure = ((ClientSessionInternal)session).getConnection();
+
+
+
+ int numberOfThreads = 100;
+ final int numberOfSessionsToCreate = 10;
+
+ final CountDownLatch alignLatch = new CountDownLatch(numberOfThreads);
+ final CountDownLatch startFlag = new CountDownLatch(1);
+
+ class CreateSessionThread extends Thread
+ {
+ Throwable failure;
+
+ public void run()
+ {
+ try
+ {
+ alignLatch.countDown();
+ startFlag.await();
+ for (int i = 0 ; i < numberOfSessionsToCreate; i++)
+ {
+ Thread.yield();
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.close();
+ }
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.failure = e;
+ }
+ }
+ }
+
+ CreateSessionThread threads[] = new CreateSessionThread[numberOfThreads];
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i] = new CreateSessionThread();
+ threads[i].start();
+ }
+
+ // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+ alignLatch.await();
+
+ timer.schedule(new TimerTask()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ connFailure.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+ }
+ catch (Exception e)
+ {
+ log.warn("Error on the timer " + e);
+ }
+ }
+
+ }, 10, 10);
+
+ startFlag.countDown();
+
+ Throwable failure = null;
+
+ for (CreateSessionThread thread : threads)
+ {
+ thread.join();
+ if (thread.failure != null)
+ {
+ System.out.println("Thread " + thread.getName() + " failed - " + thread.failure);
+ failure = thread.failure;
+ }
+ }
+
+ if (failure != null)
+ {
+ throw failure;
+ }
+
+ sf.close();
+
+ }
+ finally
+ {
+ timer.cancel();
+
+ if (session != null)
+ {
+ session.close();
+ }
+ }
+ }
+
+ public void testCreateSessionFailBeforeSendSeveralThreads() throws Throwable
+ {
+ final long retryInterval = 500;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = -1;
+
+ final ClientSessionFactoryInternal sf = createFactory(false);
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setConfirmationWindowSize(1024 * 1024);
+
+ InVMConnector.failOnCreateConnection = true;
+
+ int numberOfThreads = 100;
+
+ final CountDownLatch alignLatch = new CountDownLatch(numberOfThreads);
+ final CountDownLatch startFlag = new CountDownLatch(1);
+
+ class CreateSessionThread extends Thread
+ {
+ Throwable failure;
+
+ public void run()
+ {
+ try
+ {
+ alignLatch.countDown();
+ startFlag.await();
+ ClientSession session = sf.createSession(false, true, true);
+
+ session.close();
+ }
+ catch (Throwable e)
+ {
+ e.printStackTrace();
+ this.failure = e;
+ }
+ }
+ }
+
+ CreateSessionThread threads[] = new CreateSessionThread[numberOfThreads];
+ for (int i = 0; i < numberOfThreads; i++)
+ {
+ threads[i] = new CreateSessionThread();
+ threads[i].start();
+ }
+
+ // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(retryInterval * 3);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ InVMConnector.failOnCreateConnection = false;
+ }
+ };
+
+ alignLatch.await();
+
+ t.start();
+
+ startFlag.countDown();
+
+ Throwable failure = null;
+
+ for (CreateSessionThread thread : threads)
+ {
+ thread.join();
+ if (thread.failure != null)
+ {
+ System.out.println("Thread " + thread.getName() + " failed - " + thread.failure);
+ failure = thread.failure;
+ }
+ }
+
+ if (failure != null)
+ {
+ throw failure;
+ }
+
+ sf.close();
+
+ t.join();
+ }
+
+ public void testCreateQueue() throws Exception
+ {
+ final long retryInterval = 50;
+
+ final double retryMultiplier = 1d;
+
+ final int reconnectAttempts = -1;
+
+ ClientSessionFactoryInternal sf = createFactory(false);
+
+ sf.setRetryInterval(retryInterval);
+ sf.setRetryIntervalMultiplier(retryMultiplier);
+ sf.setReconnectAttempts(reconnectAttempts);
+ sf.setConfirmationWindowSize(1024 * 1024);
+
+ ClientSession session = sf.createSession(false, true, true);
+
+ // Sleep 3 times retryInterval, so it should at least have 3 retries
+
+ RemotingConnection conn = ((ClientSessionInternal)session).getConnection();
+
+ InVMConnector.failOnCreateConnection = false;
+
+ conn.fail(new HornetQException(HornetQException.NOT_CONNECTED));
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ Thread.sleep(retryInterval * 3);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+
+ InVMConnector.failOnCreateConnection = false;
+ }
+ };
+
+ t.start();
+
+ for (int i = 0; i < 10; i++)
+ {
+ session.createQueue("address", "queue" + i);
+ }
+
+ //
+ // InVMConnector.failOnCreateConnection = true;
+
+ //
+ // //Should throw exception since didn't reconnect
+ //
+ // try
+ // {
+ // session.start();
+ //
+ // fail("Should throw exception");
+ // }
+ // catch (HornetQException e)
+ // {
+ // assertEquals(HornetQException.OBJECT_CLOSED, e.getCode());
+ // }
+
+ session.close();
+
+ sf.close();
+
+ t.join();
+ }
+
public void testReattachAttemptsSucceedsInReconnecting() throws Exception
{
final long retryInterval = 500;
@@ -439,13 +731,13 @@
final int reconnectAttempts = 10;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
sf.setReconnectAttempts(reconnectAttempts);
sf.setConfirmationWindowSize(1024 * 1024);
-
+
ClientSession session = sf.createSession(false, true, true);
session.createQueue(ADDRESS, ADDRESS, null, false);
@@ -507,7 +799,7 @@
final int reconnectAttempts = -1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -599,7 +891,7 @@
final int reconnectAttempts = -1;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -677,7 +969,7 @@
final long maxRetryInterval = 1000;
- ClientSessionFactoryInternal sf = new ClientSessionFactoryImpl(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));
+ ClientSessionFactoryInternal sf = createFactory(false);
sf.setRetryInterval(retryInterval);
sf.setRetryIntervalMultiplier(retryMultiplier);
@@ -757,11 +1049,8 @@
{
super.setUp();
- Configuration liveConf = new ConfigurationImpl();
- liveConf.setSecurityEnabled(false);
- liveConf.getAcceptorConfigurations()
- .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
- service = HornetQ.newHornetQServer(liveConf, false);
+ service = createServer(false, false);
+
service.start();
}
15 years, 1 month
JBoss hornetq SVN: r8513 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 02:06:15 -0500 (Thu, 03 Dec 2009)
New Revision: 8513
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
Reverting my last commit, and removing non necessary synchronized block (since the whole method is synchronized now)
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-03 07:01:31 UTC (rev 8512)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-03 07:06:15 UTC (rev 8513)
@@ -264,58 +264,54 @@
}
}
- private synchronized void initialise() throws Exception
+ private void initialise() throws Exception
{
- if (!readOnly)
- {
- readOnly = true;
- setThreadPools();
+ setThreadPools();
- instantiateLoadBalancingPolicy();
+ instantiateLoadBalancingPolicy();
- if (discoveryAddress != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+ if (discoveryAddress != null)
+ {
+ InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
- discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
- discoveryAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
+ discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
- discoveryGroup.registerListener(this);
+ discoveryGroup.registerListener(this);
- discoveryGroup.start();
- }
- else if (staticConnectors != null)
+ discoveryGroup.start();
+ }
+ else if (staticConnectors != null)
+ {
+ for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
{
- for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
- {
- FailoverManager cm = new FailoverManagerImpl(this,
- pair.a,
- pair.b,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ FailoverManager cm = new FailoverManagerImpl(this,
+ pair.a,
+ pair.b,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
- failoverManagerMap.put(pair, cm);
- }
-
- updatefailoverManagerArray();
+ failoverManagerMap.put(pair, cm);
}
- else
- {
- throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
- }
+
+ updatefailoverManagerArray();
}
+ else
+ {
+ throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
+ }
}
// Static
@@ -1081,7 +1077,7 @@
}
}
- private ClientSession createSessionInternal(final String username,
+ private synchronized ClientSession createSessionInternal(final String username,
final String password,
final boolean xa,
final boolean autoCommitSends,
@@ -1104,6 +1100,8 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
}
+
+ readOnly = true;
}
if (discoveryGroup != null && !receivedBroadcast)
@@ -1117,35 +1115,32 @@
}
}
- synchronized (this)
- {
- int pos = loadBalancingPolicy.select(failoverManagerArray.length);
+ int pos = loadBalancingPolicy.select(failoverManagerArray.length);
- FailoverManager failoverManager = failoverManagerArray[pos];
+ FailoverManager failoverManager = failoverManagerArray[pos];
- ClientSession session = failoverManager.createSession(username,
- password,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- ackBatchSize,
- cacheLargeMessagesClient,
- minLargeMessageSize,
- blockOnAcknowledge,
- autoGroup,
- confirmationWindowSize,
- producerWindowSize,
- consumerWindowSize,
- producerMaxRate,
- consumerMaxRate,
- blockOnNonPersistentSend,
- blockOnPersistentSend,
- initialMessagePacketSize,
- groupID);
+ ClientSession session = failoverManager.createSession(username,
+ password,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ ackBatchSize,
+ cacheLargeMessagesClient,
+ minLargeMessageSize,
+ blockOnAcknowledge,
+ autoGroup,
+ confirmationWindowSize,
+ producerWindowSize,
+ consumerWindowSize,
+ producerMaxRate,
+ consumerMaxRate,
+ blockOnNonPersistentSend,
+ blockOnPersistentSend,
+ initialMessagePacketSize,
+ groupID);
- return session;
- }
+ return session;
}
private void instantiateLoadBalancingPolicy()
15 years, 1 month
JBoss hornetq SVN: r8512 - trunk/src/main/org/hornetq/core/replication/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-03 02:01:31 -0500 (Thu, 03 Dec 2009)
New Revision: 8512
Modified:
trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
Log:
Adding confirmation call on the ReplicationManager
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-02 23:51:36 UTC (rev 8511)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-12-03 07:01:31 UTC (rev 8512)
@@ -172,6 +172,9 @@
log.warn(e.getMessage(), e);
response = new HornetQExceptionMessage((HornetQException)e);
}
+
+ channel.confirm(packet);
+
channel.send(response);
}
Modified: trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-02 23:51:36 UTC (rev 8511)
+++ trunk/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-12-03 07:01:31 UTC (rev 8512)
@@ -491,6 +491,8 @@
{
replicated();
}
+
+ replicatingChannel.confirm(packet);
}
}
15 years, 1 month
JBoss hornetq SVN: r8511 - trunk/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-02 18:51:36 -0500 (Wed, 02 Dec 2009)
New Revision: 8511
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
just a tweak
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-02 23:05:09 UTC (rev 8510)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-02 23:51:36 UTC (rev 8511)
@@ -264,54 +264,58 @@
}
}
- private void initialise() throws Exception
+ private synchronized void initialise() throws Exception
{
- setThreadPools();
+ if (!readOnly)
+ {
+ readOnly = true;
+ setThreadPools();
- instantiateLoadBalancingPolicy();
+ instantiateLoadBalancingPolicy();
- if (discoveryAddress != null)
- {
- InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
+ if (discoveryAddress != null)
+ {
+ InetAddress groupAddress = InetAddress.getByName(discoveryAddress);
- discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
- discoveryAddress,
- groupAddress,
- discoveryPort,
- discoveryRefreshTimeout);
+ discoveryGroup = new DiscoveryGroupImpl(UUIDGenerator.getInstance().generateStringUUID(),
+ discoveryAddress,
+ groupAddress,
+ discoveryPort,
+ discoveryRefreshTimeout);
- discoveryGroup.registerListener(this);
+ discoveryGroup.registerListener(this);
- discoveryGroup.start();
- }
- else if (staticConnectors != null)
- {
- for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
+ discoveryGroup.start();
+ }
+ else if (staticConnectors != null)
{
- FailoverManager cm = new FailoverManagerImpl(this,
- pair.a,
- pair.b,
- failoverOnServerShutdown,
- callTimeout,
- clientFailureCheckPeriod,
- connectionTTL,
- retryInterval,
- retryIntervalMultiplier,
- maxRetryInterval,
- reconnectAttempts,
- threadPool,
- scheduledThreadPool,
- interceptors);
+ for (Pair<TransportConfiguration, TransportConfiguration> pair : staticConnectors)
+ {
+ FailoverManager cm = new FailoverManagerImpl(this,
+ pair.a,
+ pair.b,
+ failoverOnServerShutdown,
+ callTimeout,
+ clientFailureCheckPeriod,
+ connectionTTL,
+ retryInterval,
+ retryIntervalMultiplier,
+ maxRetryInterval,
+ reconnectAttempts,
+ threadPool,
+ scheduledThreadPool,
+ interceptors);
- failoverManagerMap.put(pair, cm);
+ failoverManagerMap.put(pair, cm);
+ }
+
+ updatefailoverManagerArray();
}
-
- updatefailoverManagerArray();
+ else
+ {
+ throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
+ }
}
- else
- {
- throw new IllegalStateException("Before using a session factory you must either set discovery address and port or " + "provide some static transport configuration");
- }
}
// Static
@@ -1077,7 +1081,7 @@
}
}
- private synchronized ClientSession createSessionInternal(final String username,
+ private ClientSession createSessionInternal(final String username,
final String password,
final boolean xa,
final boolean autoCommitSends,
@@ -1100,8 +1104,6 @@
{
throw new HornetQException(HornetQException.INTERNAL_ERROR, "Failed to initialise session factory", e);
}
-
- readOnly = true;
}
if (discoveryGroup != null && !receivedBroadcast)
15 years, 1 month
JBoss hornetq SVN: r8510 - trunk/tests/src/org/hornetq/tests/integration/cluster/reattach.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-12-02 18:05:09 -0500 (Wed, 02 Dec 2009)
New Revision: 8510
Modified:
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
1000 for the hudson testsuite is too much, using 10 instead
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-02 21:57:11 UTC (rev 8509)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-02 23:05:09 UTC (rev 8510)
@@ -1278,7 +1278,7 @@
protected int getNumIterations()
{
- return 1000;
+ return 10;
}
@Override
15 years, 1 month
JBoss hornetq SVN: r8509 - in trunk: tests/src/org/hornetq/tests/integration/cluster/reattach and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-02 16:57:11 -0500 (Wed, 02 Dec 2009)
New Revision: 8509
Removed:
trunk/tests/src/org/hornetq/tests/util/sizeof/
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
Log:
removed unnecessary tests
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-02 20:51:57 UTC (rev 8508)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2009-12-02 21:57:11 UTC (rev 8509)
@@ -1077,7 +1077,7 @@
}
}
- private ClientSession createSessionInternal(final String username,
+ private synchronized ClientSession createSessionInternal(final String username,
final String password,
final boolean xa,
final boolean autoCommitSends,
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-02 20:51:57 UTC (rev 8508)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-02 21:57:11 UTC (rev 8509)
@@ -731,6 +731,7 @@
{
if (closed)
{
+ log.info("Already closed so not closing");
return;
}
Modified: trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-02 20:51:57 UTC (rev 8508)
+++ trunk/src/main/org/hornetq/core/client/impl/DelegatingSession.java 2009-12-02 21:57:11 UTC (rev 8509)
@@ -72,7 +72,8 @@
{
if (!session.isClosed())
{
- log.warn("I'm closing a core ClientSession you left open. Please make sure you close all ClientSessions explicitly " + "before letting them go out of scope!");
+ log.warn("I'm closing a core ClientSession you left open. Please make sure you close all ClientSessions explicitly " + "before letting them go out of scope! " +
+ System.identityHashCode(this));
log.warn("The ClientSession you didn't close was created here:", creationStack);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-02 20:51:57 UTC (rev 8508)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/reattach/MultiThreadRandomReattachTestBase.java 2009-12-02 21:57:11 UTC (rev 8509)
@@ -1097,13 +1097,17 @@
*/
protected void doTestL(final ClientSessionFactory sf) throws Exception
{
- final int numSessions = 10;
+ final int numSessions = 1000;
for (int i = 0; i < numSessions; i++)
{
ClientSession session = sf.createSession(false, false, false);
+
+ log.info("Created session " + System.identityHashCode(session));
session.close();
+
+ log.info("closed session");
}
}
@@ -1274,7 +1278,7 @@
protected int getNumIterations()
{
- return 3;
+ return 1000;
}
@Override
15 years, 1 month
JBoss hornetq SVN: r8508 - trunk/src/main/org/hornetq/utils.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-02 15:51:57 -0500 (Wed, 02 Dec 2009)
New Revision: 8508
Added:
trunk/src/main/org/hornetq/utils/MemorySize.java
Log:
added missing class
Added: trunk/src/main/org/hornetq/utils/MemorySize.java
===================================================================
--- trunk/src/main/org/hornetq/utils/MemorySize.java (rev 0)
+++ trunk/src/main/org/hornetq/utils/MemorySize.java 2009-12-02 20:51:57 UTC (rev 8508)
@@ -0,0 +1,143 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.lang.ref.WeakReference;
+
+import org.hornetq.core.logging.Logger;
+
+/**
+ * A MemorySize
+ *
+ * @author Clebert Suconic
+ * @author Tim Fox
+ *
+ *
+ */
+public class MemorySize
+{
+ private static final Logger log = Logger.getLogger(MemorySize.class);
+
+ private static final int numberOfObjects = 10000;
+
+ private static Object newObject(final ObjectFactory factory) throws Exception
+ {
+ return factory.createObject();
+ }
+
+ public static boolean is64bitArch()
+ {
+ boolean is64bit = true; //Default to 64 e.g. if can't retrieve property
+
+ try
+ {
+ String arch = System.getProperty("os.arch");
+
+ if (arch != null)
+ {
+ is64bit = arch.contains("64");
+ }
+ }
+ catch (Exception e)
+ {
+ //Ignore
+ }
+
+ return is64bit;
+ }
+
+ public interface ObjectFactory
+ {
+ Object createObject();
+ }
+
+ public static int calculateSize(final ObjectFactory factory) throws Exception
+ {
+ final Runtime runtime = Runtime.getRuntime();
+
+ getMemorySize(runtime);
+
+ newObject(factory);
+
+ int i = 0;
+ long heap1 = 0;
+ long heap2 = 0;
+ long totalMemory1 = 0;
+ long totalMemory2 = 0;
+
+ //First we do a dry run with twice as many then throw away the results
+
+ Object[] obj = new Object[numberOfObjects * 2];
+
+ for (i = 0; i < numberOfObjects * 2; i++)
+ {
+ obj[i] = newObject(factory);
+ }
+
+ obj = new Object[numberOfObjects * 2];
+
+ heap1 = getMemorySize(runtime);
+
+ totalMemory1 = runtime.totalMemory();
+
+ for (i = 0; i < numberOfObjects; i++)
+ {
+ obj[i] = newObject(factory);
+ }
+
+ heap2 = getMemorySize(runtime);
+
+ totalMemory2 = runtime.totalMemory();
+
+ final int size = Math.round(((float)(heap2 - heap1)) / numberOfObjects);
+
+ if (totalMemory1 != totalMemory2)
+ {
+ //throw new IllegalStateException("Warning: JVM allocated more data what would make results invalid " +
+ // totalMemory1 + ":" + totalMemory2);
+
+ log.warn("Warning: JVM allocated more data what would make results invalid " +
+ totalMemory1 + ":" + totalMemory2);
+ }
+
+ return size;
+ }
+
+ private static long getMemorySize(final Runtime runtime)
+ {
+ for (int i = 0; i < 5; i++)
+ {
+ forceGC();
+ }
+ return runtime.totalMemory() - runtime.freeMemory();
+ }
+
+ private static void forceGC()
+ {
+ WeakReference<Object> dumbReference = new WeakReference<Object>(new Object());
+ // A loop that will wait GC, using the minimal time as possible
+ while (dumbReference.get() != null)
+ {
+ System.gc();
+ try
+ {
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+ }
+
+}
15 years, 1 month
JBoss hornetq SVN: r8507 - in trunk/tests/src/org/hornetq/tests/integration: cluster/bridge and 1 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-12-02 15:50:50 -0500 (Wed, 02 Dec 2009)
New Revision: 8507
Modified:
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
Log:
fixed some tests
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-12-02 20:03:15 UTC (rev 8506)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-12-02 20:50:50 UTC (rev 8507)
@@ -1008,7 +1008,7 @@
pagedDestinationA.setPageSizeBytes(1024);
pagedDestinationA.setMaxSizeBytes(10 * 1024);
- int NUMBER_MESSAGES_BEFORE_PAGING = 20;
+ int NUMBER_MESSAGES_BEFORE_PAGING = 11;
addresses.put(PAGED_ADDRESS_A.toString(), pagedDestinationA);
Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-12-02 20:03:15 UTC (rev 8506)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/bridge/BridgeTest.java 2009-12-02 20:50:50 UTC (rev 8507)
@@ -677,7 +677,7 @@
for (int i = 0; i < numMessages; i++)
{
- ClientMessage message = consumer1.receive(200);
+ ClientMessage message = consumer1.receive(5000);
assertNotNull(message);
Modified: trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2009-12-02 20:03:15 UTC (rev 8506)
+++ trunk/tests/src/org/hornetq/tests/integration/management/AddressControlTest.java 2009-12-02 20:50:50 UTC (rev 8507)
@@ -344,7 +344,7 @@
AddressSettings addressSettings = new AddressSettings();
addressSettings.setPageSizeBytes(1024);
addressSettings.setMaxSizeBytes(10 * 1024);
- int NUMBER_MESSAGES_BEFORE_PAGING = 14;
+ int NUMBER_MESSAGES_BEFORE_PAGING = 5;
server.getAddressSettingsRepository().addMatch(address.toString(), addressSettings);
server.start();
15 years, 1 month