Author: clebert.suconic(a)jboss.com
Date: 2011-09-24 16:49:52 -0400 (Sat, 24 Sep 2011)
New Revision: 11409
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
fixing test
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
---
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-24
16:16:41 UTC (rev 11408)
+++
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-09-24
20:49:52 UTC (rev 11409)
@@ -65,11 +65,13 @@
import org.hornetq.core.paging.impl.PagingManagerImpl;
import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.GroupingInfo;
+import org.hornetq.core.persistence.OperationContext;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.persistence.config.PersistedAddressSetting;
import org.hornetq.core.persistence.config.PersistedRoles;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.persistence.impl.journal.OperationContextImpl;
import org.hornetq.core.persistence.impl.nullpm.NullStorageManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.DuplicateIDCache;
@@ -137,11 +139,13 @@
//
------------------------------------------------------------------------------------
private static final Logger log = Logger.getLogger(HornetQServerImpl.class);
-
- // JMS Topics (which are outside of the scope of the core API) will require a dumb
subscription with a dummy-filter at this current version
+
+ // JMS Topics (which are outside of the scope of the core API) will require a dumb
subscription with a dummy-filter
+ // at this current version
// as a way to keep its existence valid and TCK tests
// That subscription needs an invalid filter, however paging needs to ignore any
subscription with this filter.
- // For that reason, this filter needs to be rejected on paging or any other component
on the system, and just be ignored for any purpose
+ // For that reason, this filter needs to be rejected on paging or any other component
on the system, and just be
+ // ignored for any purpose
// It's declared here as this filter is considered a global ignore
public static final String GENERIC_IGNORED_FILTER = "__HQX=-1";
@@ -151,7 +155,6 @@
// Attributes
//
-----------------------------------------------------------------------------------
-
private final Version version;
private final HornetQSecurityManager securityManager;
@@ -161,7 +164,7 @@
private final MBeanServer mbeanServer;
private volatile boolean started;
-
+
private volatile boolean stopped;
private volatile SecurityStore securityStore;
@@ -171,7 +174,7 @@
private volatile QueueFactory queueFactory;
private volatile PagingManager pagingManager;
-
+
private volatile PostOffice postOffice;
private volatile ExecutorService threadPool;
@@ -193,7 +196,7 @@
private volatile RemotingService remotingService;
private volatile ManagementService managementService;
-
+
private volatile ConnectorsService connectorsService;
private MemoryManager memoryManager;
@@ -214,7 +217,7 @@
private boolean initialised;
- // private FailoverManager replicationFailoverManager;
+ // private FailoverManager replicationFailoverManager;
private ReplicationManager replicationManager;
@@ -223,17 +226,16 @@
private final Set<ActivateCallback> activateCallbacks = new
HashSet<ActivateCallback>();
private volatile GroupingHandler groupingHandler;
-
+
private NodeManager nodeManager;
-
+
// Used to identify the server on tests... useful on debugging testcases
private String identity;
-
+
private Thread backupActivationThread;
private Activation activation;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -312,74 +314,89 @@
public synchronized void start() throws Exception
{
- stopped = false;
-
- initialiseLogging();
+ OperationContextImpl.clearContext();
- checkJournalDirectory();
+ try
+ {
+ stopped = false;
- nodeManager = createNodeManager(configuration.getJournalDirectory());
+ initialiseLogging();
- nodeManager.start();
+ checkJournalDirectory();
- if (started)
- {
- HornetQServerImpl.log.info((configuration.isBackup() ? "backup" :
"live") + " is already started, ignoring the call to start..");
- return;
- }
+ nodeManager = createNodeManager(configuration.getJournalDirectory());
- HornetQServerImpl.log.info((configuration.isBackup() ? "backup" :
"live") + " server is starting with configuration " + configuration);
+ nodeManager.start();
- if (configuration.isRunSyncSpeedTest())
- {
- SyncSpeedTest test = new SyncSpeedTest();
-
- test.run();
- }
-
- if (!configuration.isBackup())
- {
- if (configuration.isSharedStore() &&
configuration.isPersistenceEnabled())
+ if (started)
{
- activation = new SharedStoreLiveActivation();
+ HornetQServerImpl.log.info((configuration.isBackup() ? "backup" :
"live") + " is already started, ignoring the call to start..");
+ return;
+ }
- // This should block until the lock is got
+ HornetQServerImpl.log.info((configuration.isBackup() ? "backup" :
"live") + " server is starting with configuration " +
+ configuration);
- activation.run();
- }
- else
+ if (configuration.isRunSyncSpeedTest())
{
- activation = new NoSharedStoreLiveActivation();
+ SyncSpeedTest test = new SyncSpeedTest();
- activation.run();
+ test.run();
}
- started = true;
- HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]" +
(this.identity != null ? " (" + identity : ")") + "
started");
- }
+ if (!configuration.isBackup())
+ {
+ if (configuration.isSharedStore() &&
configuration.isPersistenceEnabled())
+ {
+ activation = new SharedStoreLiveActivation();
+ // This should block until the lock is got
- // The activation on fail-back may change the value of isBackup, for that reason we
are not using else here
- if (configuration.isBackup())
- {
- if (configuration.isSharedStore())
- {
- activation = new SharedStoreBackupActivation();
+ activation.run();
+ }
+ else
+ {
+ activation = new NoSharedStoreLiveActivation();
+
+ activation.run();
+ }
+ started = true;
+
+ HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() +
+ " [" +
+ nodeManager.getNodeId() +
+ "]" +
+ (this.identity != null ? " (" + identity
: ")") +
+ " started");
}
- else
+
+ // The activation on fail-back may change the value of isBackup, for that reason
we are not using else here
+ if (configuration.isBackup())
{
- // Replicated
+ if (configuration.isSharedStore())
+ {
+ activation = new SharedStoreBackupActivation();
+ }
+ else
+ {
+ // Replicated
- activation = new SharedNothingBackupActivation();
+ activation = new SharedNothingBackupActivation();
+ }
+
+ backupActivationThread = new Thread(activation, "Activation for server
" + this);
+ backupActivationThread.start();
}
- backupActivationThread = new Thread(activation, "Activation for server
" + this);
- backupActivationThread.start();
+ // start connector service
+ connectorsService = new ConnectorsService(configuration, storageManager,
scheduledPool, postOffice);
+ connectorsService.start();
}
-
- // start connector service
- connectorsService = new ConnectorsService(configuration, storageManager,
scheduledPool, postOffice);
- connectorsService.start();
+ finally
+ {
+ // this avoids embedded applications using dirty contexts from startup
+ OperationContextImpl.clearContext();
+ }
}
@Override
@@ -400,7 +417,7 @@
stopped = true;
stop(configuration.isFailoverOnServerShutdown());
}
-
+
public void threadDump(final String reason)
{
StringWriter str = new StringWriter();
@@ -445,20 +462,21 @@
}
connectorsService.stop();
- //we stop the groupinghandler before we stop te cluster manager so binding
mappings aren't removed in case of failover
+ // we stop the groupinghandler before we stop te cluster manager so binding
mappings aren't removed in case of
+ // failover
if (groupingHandler != null)
{
managementService.removeNotificationListener(groupingHandler);
groupingHandler = null;
}
-
+
if (clusterManager != null)
{
clusterManager.stop();
}
}
-
+
// We close all the exception in an attempt to let any pending IO to finish
// to avoid scenarios where the send or ACK got to disk but the response didn't
get to the client
// It may still be possible to have this scenario on a real failure (without the
use of XA)
@@ -545,9 +563,9 @@
{
memoryManager.stop();
}
-
+
threadPool.shutdown();
-
+
scheduledPool.shutdown();
try
@@ -563,7 +581,6 @@
}
threadPool = null;
-
try
{
if (!scheduledPool.awaitTermination(10, TimeUnit.SECONDS))
@@ -577,7 +594,7 @@
}
threadPool = null;
-
+
scheduledPool = null;
pagingManager = null;
@@ -610,17 +627,20 @@
nodeManager.stop();
nodeManager = null;
-
+
addressSettingsRepository.clearListeners();
-
+
addressSettingsRepository.clearCache();
- HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() + " [" + tempNodeID + "] stopped");
+ HornetQServerImpl.log.info("HornetQ Server version " +
getVersion().getFullVersion() +
+ " [" +
+ tempNodeID +
+ "] stopped");
Logger.reset();
}
- }
+ }
// HornetQServer implementation
// -----------------------------------------------------------
@@ -629,33 +649,33 @@
{
StringWriter str = new StringWriter();
PrintWriter out = new PrintWriter(str);
-
+
out.println("Information about server " + this.identity);
out.println("Cluster Connection:" +
this.getClusterManager().describe());
-
+
return str.toString();
}
-
+
public void setIdentity(String identity)
{
this.identity = identity;
}
-
+
public String getIdentity()
{
return identity;
}
-
+
public ScheduledExecutorService getScheduledPool()
{
return scheduledPool;
}
-
+
public ExecutorService getThreadPool()
{
return threadPool;
}
-
+
public Configuration getConfiguration()
{
return configuration;
@@ -665,7 +685,7 @@
{
return mbeanServer;
}
-
+
public PagingManager getPagingManager()
{
return pagingManager;
@@ -695,7 +715,7 @@
{
return securityRepository;
}
-
+
public NodeManager getNodeManager()
{
return nodeManager;
@@ -802,7 +822,7 @@
{
// getSessions is called here in a try to minimize locking the Server while this
check is being done
Set<ServerSession> allSessions = getSessions();
-
+
for (ServerSession session : allSessions)
{
String metaValue = session.getMetaData(key);
@@ -811,10 +831,10 @@
return true;
}
}
-
+
return false;
}
-
+
public synchronized List<ServerSession> getSessions(final String connectionID)
{
Set<Entry<String, ServerSession>> sessionEntries =
sessions.entrySet();
@@ -865,7 +885,7 @@
public SimpleString getNodeID()
{
- return nodeManager == null?null:nodeManager.getNodeId();
+ return nodeManager == null ? null : nodeManager.getNodeId();
}
public Queue createQueue(final SimpleString address,
@@ -876,24 +896,24 @@
{
return createQueue(address, queueName, filterString, durable, temporary, false);
}
-
+
public Queue locateQueue(SimpleString queueName) throws Exception
{
Binding binding = postOffice.getBinding(queueName);
-
+
if (binding == null)
{
return null;
}
-
+
Bindable queue = binding.getBindable();
-
+
if (!(queue instanceof Queue))
{
throw new IllegalStateException("locateQueue should only be used to locate
queues");
}
-
- return (Queue) binding.getBindable();
+
+ return (Queue)binding.getBindable();
}
public Queue deployQueue(final SimpleString address,
@@ -919,7 +939,7 @@
}
Queue queue = (Queue)binding.getBindable();
-
+
if (queue.getPageSubscription() != null)
{
queue.getPageSubscription().close();
@@ -997,7 +1017,6 @@
return connectorsService;
}
-
public synchronized boolean checkActivate() throws Exception
{
if (configuration.isBackup())
@@ -1081,7 +1100,7 @@
managementService.registerDivert(divert, config);
}
-
+
public void destroyDivert(SimpleString name) throws Exception
{
Binding binding = postOffice.getBinding(name);
@@ -1097,8 +1116,6 @@
postOffice.removeBinding(name);
}
-
-
public void deployBridge(BridgeConfiguration config) throws Exception
{
if (clusterManager != null)
@@ -1106,7 +1123,7 @@
clusterManager.deployBridge(config, true);
}
}
-
+
public void destroyBridge(String name) throws Exception
{
if (clusterManager != null)
@@ -1114,14 +1131,14 @@
clusterManager.destroyBridge(name);
}
}
-
+
public ServerSession getSessionByID(String sessionName)
{
return sessions.get(sessionName);
}
-
+
// PUBLIC -------
-
+
public String toString()
{
if (identity != null)
@@ -1134,8 +1151,6 @@
}
}
-
-
// Package protected
// ----------------------------------------------------------------------------
@@ -1146,30 +1161,30 @@
* Protected so tests can change this behaviour
* @param backupConnector
*/
-// protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
-// final
ExecutorService threadPool,
-// final
ScheduledExecutorService scheduledPool)
-// {
-// return new FailoverManagerImpl((ClientSessionFactory)null,
-// backupConnector,
-// null,
-// false,
-// HornetQClient.DEFAULT_CALL_TIMEOUT,
-//
HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
-// HornetQClient.DEFAULT_CONNECTION_TTL,
-// 0,
-// 1.0d,
-// 0,
-// 1,
-// false,
-// threadPool,
-// scheduledPool,
-// null);
-// }
+ // protected FailoverManagerImpl createBackupConnectionFailoverManager(final
TransportConfiguration backupConnector,
+ // final ExecutorService threadPool,
+ // final ScheduledExecutorService scheduledPool)
+ // {
+ // return new FailoverManagerImpl((ClientSessionFactory)null,
+ // backupConnector,
+ // null,
+ // false,
+ // HornetQClient.DEFAULT_CALL_TIMEOUT,
+ // HornetQClient.DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ // HornetQClient.DEFAULT_CONNECTION_TTL,
+ // 0,
+ // 1.0d,
+ // 0,
+ // 1,
+ // false,
+ // threadPool,
+ // scheduledPool,
+ // null);
+ // }
protected PagingManager createPagingManager()
{
-
+
return new PagingManagerImpl(new
PagingStoreFactoryNIO(configuration.getPagingDirectory(),
(long)configuration.getJournalBufferSize_NIO(),
scheduledPool,
@@ -1313,11 +1328,9 @@
nodeManager.getUUID(),
configuration.isBackup(),
configuration.isClustered());
-
clusterManager.deploy();
-
remotingService = new RemotingServiceImpl(clusterManager, configuration, this,
managementService, scheduledPool);
messagingServerControl = managementService.registerServer(postOffice,
@@ -1341,7 +1354,7 @@
addressSettingsDeployer.start();
}
-
+
deployAddressSettingsFromConfiguration();
storageManager.start();
@@ -1385,15 +1398,14 @@
private void initialisePart2() throws Exception
{
// Load the journal and populate queues, transactions and caches in memory
-
-
+
if (stopped)
{
return;
}
-
+
pagingManager.reloadStores();
-
+
JournalLoadInformation[] journalInfo = loadJournals();
compareJournals(journalInfo);
@@ -1507,13 +1519,17 @@
for (QueueBindingInfo queueBindingInfo : queueBindingInfos)
{
queueBindingInfosMap.put(queueBindingInfo.getId(), queueBindingInfo);
-
- if (queueBindingInfo.getFilterString() == null ||
!queueBindingInfo.getFilterString().toString().equals(GENERIC_IGNORED_FILTER))
+
+ if (queueBindingInfo.getFilterString() == null ||
!queueBindingInfo.getFilterString()
+ .toString()
+
.equals(GENERIC_IGNORED_FILTER))
{
Filter filter = FilterImpl.createFilter(queueBindingInfo.getFilterString());
-
- PageSubscription subscription =
pagingManager.getPageStore(queueBindingInfo.getAddress()).getCursorProvier().createSubscription(queueBindingInfo.getId(),
filter, true);
-
+
+ PageSubscription subscription =
pagingManager.getPageStore(queueBindingInfo.getAddress())
+ .getCursorProvier()
+
.createSubscription(queueBindingInfo.getId(), filter, true);
+
Queue queue = queueFactory.createQueue(queueBindingInfo.getId(),
queueBindingInfo.getAddress(),
queueBindingInfo.getQueueName(),
@@ -1521,18 +1537,17 @@
subscription,
true,
false);
-
+
Binding binding = new LocalQueueBinding(queueBindingInfo.getAddress(), queue,
nodeManager.getNodeId());
-
+
queues.put(queueBindingInfo.getId(), queue);
-
+
postOffice.addBinding(binding);
-
+
managementService.registerAddress(queueBindingInfo.getAddress());
managementService.registerQueue(queue, queueBindingInfo.getAddress(),
storageManager);
}
-
-
+
}
for (GroupingInfo groupingInfo : groupingInfos)
@@ -1546,7 +1561,7 @@
}
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new
HashMap<SimpleString, List<Pair<byte[], Long>>>();
-
+
HashSet<Pair<Long, Long>> pendingLargeMessages = new
HashSet<Pair<Long, Long>>();
journalInfo[1] = storageManager.loadMessageJournal(postOffice,
@@ -1568,7 +1583,7 @@
cache.load(entry.getValue());
}
}
-
+
for (Pair<Long, Long> msgToDelete : pendingLargeMessages)
{
log.info("Deleting pending large message as it wasn't completed:"
+ msgToDelete);
@@ -1631,19 +1646,20 @@
}
Filter filter = FilterImpl.createFilter(filterString);
-
+
long queueID = storageManager.generateUniqueID();
PageSubscription pageSubscription;
-
-
+
if (filterString != null &&
filterString.toString().equals(GENERIC_IGNORED_FILTER))
{
pageSubscription = null;
}
else
{
- pageSubscription =
pagingManager.getPageStore(address).getCursorProvier().createSubscription(queueID, filter,
durable);
+ pageSubscription = pagingManager.getPageStore(address)
+ .getCursorProvier()
+ .createSubscription(queueID, filter, durable);
}
final Queue queue = queueFactory.createQueue(queueID,
@@ -1759,32 +1775,31 @@
}
else
{
- throw new IllegalArgumentException("Directory " + journalDir +
- " does not exist and will not be created");
+ throw new IllegalArgumentException("Directory " + journalDir +
" does not exist and will not be created");
}
}
}
-
+
/**
* To be called by backup trying to fail back the server
*/
private void startFailbackChecker()
{
- scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
+ scheduledPool.scheduleAtFixedRate(new FailbackChecker(), 1000l, 1000l,
TimeUnit.MILLISECONDS);
}
-
// Inner classes
// --------------------------------------------------------------------------------
-
+
class FailbackChecker implements Runnable
{
boolean restarting = false;
+
public void run()
{
try
{
- if(!restarting && nodeManager.isAwaitingFailback())
+ if (!restarting && nodeManager.isAwaitingFailback())
{
log.info("live server wants to restart, restarting server in
backup");
restarting = true;
@@ -1819,8 +1834,6 @@
}
}
-
-
private class SharedStoreLiveActivation implements Activation
{
public void run()
@@ -1830,7 +1843,7 @@
log.info("Waiting to obtain live lock");
checkJournalDirectory();
-
+
if (log.isDebugEnabled())
{
log.debug("First part initialization on " + this);
@@ -1838,9 +1851,10 @@
initialisePart1();
- if(nodeManager.isBackupLive())
+ if (nodeManager.isBackupLive())
{
- //looks like we've failed over at some point need to inform that we
are the backup so when the current live
+ // looks like we've failed over at some point need to inform that we
are the backup so when the current
+ // live
// goes down they failover to us
if (log.isDebugEnabled())
{
@@ -1857,9 +1871,9 @@
{
return;
}
-
+
initialisePart2();
-
+
log.info("Server is now live");
}
catch (Exception e)
@@ -1870,7 +1884,7 @@
public void close(boolean permanently) throws Exception
{
- if(permanently)
+ if (permanently)
{
nodeManager.crashLiveServer();
}
@@ -1881,7 +1895,6 @@
}
}
-
private class SharedStoreBackupActivation implements Activation
{
public void run()
@@ -1891,50 +1904,53 @@
nodeManager.startBackup();
initialisePart1();
-
+
clusterManager.start();
started = true;
- log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() + " [" + nodeManager.getNodeId() + "]
started, waiting live to fail before it gets active");
+ log.info("HornetQ Backup Server version " +
getVersion().getFullVersion() +
+ " [" +
+ nodeManager.getNodeId() +
+ "] started, waiting live to fail before it gets active");
nodeManager.awaitLiveNode();
-
+
configuration.setBackup(false);
-
+
if (stopped)
{
return;
}
-
+
initialisePart2();
-
+
clusterManager.activate();
log.info("Backup Server is now live");
nodeManager.releaseBackup();
- if(configuration.isAllowAutoFailBack())
+ if (configuration.isAllowAutoFailBack())
{
startFailbackChecker();
}
}
catch (InterruptedException e)
{
- //this is ok, we are being stopped
+ // this is ok, we are being stopped
}
catch (ClosedChannelException e)
{
- //this is ok too, we are being stopped
+ // this is ok too, we are being stopped
}
catch (Exception e)
{
- if(!(e.getCause() instanceof InterruptedException))
+ if (!(e.getCause() instanceof InterruptedException))
{
log.error("Failure in initialisation", e);
}
}
- catch(Throwable e)
+ catch (Throwable e)
{
log.error("Failure in initialisation", e);
}
@@ -1956,7 +1972,7 @@
nodeManager.interrupt();
backupActivationThread.interrupt();
-
+
backupActivationThread.join(1000);
}
@@ -1970,10 +1986,10 @@
}
else
{
- //if we are now live, behave as live
+ // if we are now live, behave as live
// We need to delete the file too, otherwise the backup will failover when we
shutdown or if the backup is
// started before the live
- if(permanently)
+ if (permanently)
{
nodeManager.crashLiveServer();
}
@@ -1984,7 +2000,7 @@
}
}
}
-
+
private interface Activation extends Runnable
{
void close(boolean permanently) throws Exception;
@@ -2044,5 +2060,4 @@
}
}
-
}