JBoss hornetq SVN: r10781 - in branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server: management and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 22:41:29 -0400 (Mon, 06 Jun 2011)
New Revision: 10781
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
Log:
avoiding NPE
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-07 02:24:54 UTC (rev 10780)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-07 02:41:29 UTC (rev 10781)
@@ -1042,9 +1042,10 @@
SimpleString.toSimpleString(coreFilterString),
durable,
false);
+
queues.put(queueName, hqQueue);
- jmsManagementService.registerQueue(hqQueue);
+ jmsManagementService.registerQueue(hqQueue, queue);
return true;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2011-06-07 02:24:54 UTC (rev 10780)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/JMSManagementService.java 2011-06-07 02:41:29 UTC (rev 10781)
@@ -14,6 +14,7 @@
package org.hornetq.jms.server.management;
import org.hornetq.api.jms.management.JMSServerControl;
+import org.hornetq.core.server.Queue;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQQueue;
import org.hornetq.jms.client.HornetQTopic;
@@ -32,7 +33,7 @@
void unregisterJMSServer() throws Exception;
- void registerQueue(HornetQQueue queue) throws Exception;
+ void registerQueue(HornetQQueue queue, Queue serverQueue) throws Exception;
void unregisterQueue(String name) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-06-07 02:24:54 UTC (rev 10780)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-06-07 02:41:29 UTC (rev 10781)
@@ -87,9 +87,8 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
}
- public synchronized void registerQueue(final HornetQQueue queue) throws Exception
+ public synchronized void registerQueue(final HornetQQueue queue, final Queue serverQueue) throws Exception
{
- Queue serverQueue = server.locateQueue(new SimpleString(queue.getName()));
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
MessageCounter counter = new MessageCounter(queue.getName(),
13 years, 6 months
JBoss hornetq SVN: r10780 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 22:24:54 -0400 (Mon, 06 Jun 2011)
New Revision: 10780
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
Log:
avoiding NPE
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-07 01:55:44 UTC (rev 10779)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2011-06-07 02:24:54 UTC (rev 10780)
@@ -1032,6 +1032,11 @@
{
Binding binding = postOffice.getBinding(queueName);
+ if (binding == null)
+ {
+ return null;
+ }
+
Bindable queue = binding.getBindable();
if (!(queue instanceof Queue))
13 years, 6 months
JBoss hornetq SVN: r10779 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 21:55:44 -0400 (Mon, 06 Jun 2011)
New Revision: 10779
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
Log:
just logging
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-07 01:45:23 UTC (rev 10778)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/Topology.java 2011-06-07 01:55:44 UTC (rev 10779)
@@ -54,9 +54,9 @@
TopologyMember currentMember = topology.get(nodeId);
if (debug)
{
- log.info("adding = " + nodeId + ":" + member.getConnector());
- log.info("before----------------------------------");
- log.info(describe());
+ log.debug("adding = " + nodeId + ":" + member.getConnector());
+ log.debug("before----------------------------------");
+ log.debug(describe());
}
if(currentMember == null)
{
@@ -87,8 +87,8 @@
}
if(debug)
{
- log.info("Topology updated=" + replaced);
- log.info(describe());
+ log.debug("Topology updated=" + replaced);
+ log.debug(describe());
}
return replaced;
}
@@ -98,7 +98,7 @@
TopologyMember member = topology.remove(nodeId);
if (debug)
{
- log.info("Removing member " + member);
+ log.debug("Removing member " + member);
}
return (member != null);
}
13 years, 6 months
JBoss hornetq SVN: r10778 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/server and 6 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 21:45:23 -0400 (Mon, 06 Jun 2011)
New Revision: 10778
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
Log:
JBPAPP-6646 - performance issue on paging - avoiding locking through scheduledExecutor and blockOnExecutor for Counters
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/messagecounter/MessageCounter.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -27,7 +27,7 @@
import java.util.GregorianCalendar;
import java.util.List;
-import org.hornetq.api.core.management.QueueControl;
+import org.hornetq.core.server.Queue;
/**
* This class stores message count informations for a given queue
@@ -58,8 +58,7 @@
private final boolean destDurable;
- // destination queue
- private final QueueControl destQueue;
+ private final Queue serverQueue;
// counter
private long countTotal;
@@ -95,7 +94,7 @@
*/
public MessageCounter(final String name,
final String subscription,
- final QueueControl queue,
+ final Queue serverQueue,
final boolean topic,
final boolean durable,
final int daycountmax)
@@ -105,7 +104,7 @@
destSubscription = subscription;
destTopic = topic;
destDurable = durable;
- destQueue = queue;
+ this.serverQueue = serverQueue;
// initialize counter
resetCounter();
@@ -115,7 +114,33 @@
setHistoryLimit(daycountmax);
}
+
+ private Runnable onTimeExecutor = new Runnable()
+ {
+ public void run()
+ {
+ long latestMessagesAdded = serverQueue.getInstantMessagesAdded();
+ long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
+
+ countTotal += newMessagesAdded;
+
+ lastMessagesAdded = latestMessagesAdded;
+
+ if (newMessagesAdded > 0)
+ {
+ timeLastAdd = System.currentTimeMillis();
+ }
+
+ // update timestamp
+ timeLastUpdate = System.currentTimeMillis();
+
+ // update message history
+ updateHistory(newMessagesAdded);
+
+ }
+ };
+
// Public --------------------------------------------------------
/*
@@ -123,24 +148,11 @@
*/
public synchronized void onTimer()
{
- long latestMessagesAdded = destQueue.getMessagesAdded();
-
- long newMessagesAdded = latestMessagesAdded - lastMessagesAdded;
-
- countTotal += newMessagesAdded;
-
- lastMessagesAdded = latestMessagesAdded;
-
- if (newMessagesAdded > 0)
- {
- timeLastAdd = System.currentTimeMillis();
- }
-
- // update timestamp
- timeLastUpdate = System.currentTimeMillis();
-
- // update message history
- updateHistory(newMessagesAdded);
+ // Actor approach here: Instead of having the Counter locking the queue, we will use the Queue's executor
+ // instead of possibly making an lock on the queue.
+ // This way the scheduled Threads will be free to keep doing their pings in case the server is busy with paging or
+ // any other deliveries
+ serverQueue.getExecutor().execute(onTimeExecutor);
}
public String getDestinationName()
@@ -190,7 +202,7 @@
*/
public long getMessageCount()
{
- return destQueue.getMessageCount();
+ return serverQueue.getInstantMessageCount();
}
/**
@@ -199,7 +211,7 @@
*/
public long getMessageCountDelta()
{
- long current = destQueue.getMessageCount();
+ long current = serverQueue.getInstantMessageCount();
int delta = (int)(current - depthLast);
depthLast = current;
@@ -334,8 +346,8 @@
destTopic +
", destDurable=" +
destDurable +
- ", destQueue=" +
- destQueue +
+ ", serverQueue =" +
+ serverQueue +
"]";
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/Queue.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -76,6 +76,9 @@
void forceDelivery();
long getMessageCount();
+
+ /** Return the current message count without waiting for scheduled executors to finish */
+ long getInstantMessageCount();
int getDeliveringCount();
@@ -86,6 +89,8 @@
List<MessageReference> getScheduledMessages();
long getMessagesAdded();
+
+ long getInstantMessagesAdded();
MessageReference removeReferenceWithID(long id) throws Exception;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/impl/QueueImpl.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -727,7 +727,12 @@
public long getMessageCount()
{
blockOnExecutorFuture();
-
+
+ return getInstantMessageCount();
+ }
+
+ public long getInstantMessageCount()
+ {
synchronized (this)
{
if (pageSubscription != null)
@@ -892,7 +897,12 @@
public long getMessagesAdded()
{
blockOnExecutorFuture();
-
+
+ return getInstantMessagesAdded();
+ }
+
+ public long getInstantMessagesAdded()
+ {
synchronized (this)
{
if (pageSubscription != null)
@@ -904,7 +914,7 @@
return messagesAdded;
}
}
- }
+ }
public int deleteAllReferences() throws Exception
{
@@ -1543,6 +1553,8 @@
holder.iter.remove();
refRemoved(ref);
+
+ handled++;
continue;
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/server/management/impl/ManagementServiceImpl.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -246,7 +246,7 @@
{
MessageCounter counter = new MessageCounter(queue.getName().toString(),
null,
- queueControl,
+ queue,
false,
queue.isDurable(),
messageCounterManager.getMaxDayCount());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/impl/JMSServerManagerImpl.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -48,6 +48,7 @@
import org.hornetq.core.security.Role;
import org.hornetq.core.server.ActivateCallback;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.impl.HornetQServerImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.ResourceManager;
@@ -193,7 +194,7 @@
{
active = true;
- jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), this);
+ jmsManagementService = new JMSManagementServiceImpl(server.getManagementService(), server, this);
try
{
@@ -1036,7 +1037,7 @@
coreFilterString = SelectorTranslator.convertToHornetQFilterString(selectorString);
}
- server.deployQueue(SimpleString.toSimpleString(hqQueue.getAddress()),
+ Queue queue = server.deployQueue(SimpleString.toSimpleString(hqQueue.getAddress()),
SimpleString.toSimpleString(hqQueue.getAddress()),
SimpleString.toSimpleString(coreFilterString),
durable,
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -15,6 +15,7 @@
import javax.management.ObjectName;
+import org.hornetq.api.core.SimpleString;
import org.hornetq.api.core.management.AddressControl;
import org.hornetq.api.core.management.QueueControl;
import org.hornetq.api.core.management.ResourceNames;
@@ -24,6 +25,8 @@
import org.hornetq.api.jms.management.TopicControl;
import org.hornetq.core.messagecounter.MessageCounter;
import org.hornetq.core.messagecounter.MessageCounterManager;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.management.ManagementService;
import org.hornetq.jms.client.HornetQConnectionFactory;
import org.hornetq.jms.client.HornetQDestination;
@@ -52,13 +55,16 @@
private final ManagementService managementService;
private final JMSServerManager jmsServerManager;
+
+ private final HornetQServer server;
// Static --------------------------------------------------------
- public JMSManagementServiceImpl(final ManagementService managementService, final JMSServerManager jmsServerManager)
+ public JMSManagementServiceImpl(final ManagementService managementService, final HornetQServer server, final JMSServerManager jmsServerManager)
{
this.managementService = managementService;
this.jmsServerManager = jmsServerManager;
+ this.server = server;
}
// Public --------------------------------------------------------
@@ -83,11 +89,12 @@
public synchronized void registerQueue(final HornetQQueue queue) throws Exception
{
+ Queue serverQueue = server.locateQueue(new SimpleString(queue.getName()));
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
MessageCounter counter = new MessageCounter(queue.getName(),
null,
- coreQueueControl,
+ serverQueue,
false,
coreQueueControl.isDurable(),
messageCounterManager.getMaxDayCount());
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -665,4 +665,22 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessageCount()
+ */
+ public long getInstantMessageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessagesAdded()
+ */
+ public long getInstantMessagesAdded()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
\ No newline at end of file
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-06-06 20:25:18 UTC (rev 10777)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/unit/core/server/cluster/impl/RemoteQueueBindImplTest.java 2011-06-07 01:45:23 UTC (rev 10778)
@@ -659,6 +659,24 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessageCount()
+ */
+ public long getInstantMessageCount()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#getInstantMessagesAdded()
+ */
+ public long getInstantMessagesAdded()
+ {
+ // TODO Auto-generated method stub
+ return 0;
+ }
+
}
}
13 years, 6 months
JBoss hornetq SVN: r10777 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 16:25:18 -0400 (Mon, 06 Jun 2011)
New Revision: 10777
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
Log:
Reverting fix to make it jms api compliant (I have mistakenly interpreted a rule)
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2011-06-06 15:33:59 UTC (rev 10776)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2011-06-06 20:25:18 UTC (rev 10777)
@@ -314,17 +314,14 @@
private void doSend(final Message message, final long timeToLive, HornetQDestination destination) throws JMSException
{
- if (message.getJMSExpiration() == 0)
+ if (timeToLive == 0)
{
- if (timeToLive == 0)
- {
- message.setJMSExpiration(0);
- }
- else
- {
- message.setJMSExpiration(System.currentTimeMillis() + timeToLive);
- }
+ message.setJMSExpiration(0);
}
+ else
+ {
+ message.setJMSExpiration(System.currentTimeMillis() + timeToLive);
+ }
if (!disableMessageTimestamp)
{
13 years, 6 months
JBoss hornetq SVN: r10776 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-06 11:33:59 -0400 (Mon, 06 Jun 2011)
New Revision: 10776
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
Log:
HORNETQ-714 - fixing testcase
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-06-06 03:15:13 UTC (rev 10775)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-06-06 15:33:59 UTC (rev 10776)
@@ -356,26 +356,22 @@
{
return freeFiles.size();
}
-
/**
- * Add directly to the freeFiles structure without reinitializing the file.
- * used on load() only
+ * @param file
+ * @throws Exception
*/
- public void addFreeFileNoInit(final JournalFile file)
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
{
- freeFiles.add(file);
-
- if (CHECK_CONSISTENCE)
- {
- checkDataFiles();
- }
+ addFreeFile(file, renameTmp, true);
}
/**
* @param file
+ * @param renameTmp - should rename the file as it's being added to free files
+ * @param checkDelete - should delete the file if max condition has been met
* @throws Exception
*/
- public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp) throws Exception
+ public synchronized void addFreeFile(final JournalFile file, final boolean renameTmp, final boolean checkDelete) throws Exception
{
long calculatedSize = 0;
try
@@ -395,7 +391,7 @@
}
else
// FIXME - size() involves a scan!!!
- if (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles)
+ if (!checkDelete || (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() < minFiles))
{
// Re-initialise it
@@ -415,6 +411,17 @@
}
else
{
+ if (trace)
+ {
+ log.trace("DataFiles.size() = " + dataFiles.size());
+ log.trace("openedFiles.size() = " + openedFiles.size());
+ log.trace("minfiles = " + minFiles);
+ log.trace("Free Files = " + freeFiles.size());
+ log.trace("File " + file +
+ " being deleted as freeFiles.size() + dataFiles.size() + 1 + openedFiles.size() (" +
+ (freeFiles.size() + dataFiles.size() + 1 + openedFiles.size()) +
+ ") < minFiles (" + minFiles + ")" );
+ }
file.getFile().delete();
}
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-06-06 03:15:13 UTC (rev 10775)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-06-06 15:33:59 UTC (rev 10776)
@@ -2155,7 +2155,7 @@
if (changeData)
{
// Empty dataFiles with no data
- filesRepository.addFreeFile(file, false);
+ filesRepository.addFreeFile(file, false, false);
}
}
}
13 years, 6 months
JBoss hornetq SVN: r10775 - branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-05 23:15:13 -0400 (Sun, 05 Jun 2011)
New Revision: 10775
Modified:
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
Log:
Reverting commit
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-06 03:12:33 UTC (rev 10774)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-06 03:15:13 UTC (rev 10775)
@@ -3677,18 +3677,6 @@
}
}
- public void testLoop() throws Exception
- {
- for (int i = 0 ; i < 1000; i++)
- {
- log.warn("#test " + i);
- testDLAOnLargeMessageAndPaging();
- tearDown();
- setUp();
- }
-
- }
-
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
13 years, 6 months
JBoss hornetq SVN: r10774 - in branches/Branch_2_2_EAP: src/main/org/hornetq/core/persistence/impl/journal and 2 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-05 23:12:33 -0400 (Sun, 05 Jun 2011)
New Revision: 10774
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
Log:
HORNETQ-714 - fixing out of ordering issue after compacting
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalFilesRepository.java 2011-06-06 03:12:33 UTC (rev 10774)
@@ -42,6 +42,10 @@
private static final Logger log = Logger.getLogger(JournalFilesRepository.class);
private static final boolean trace = JournalFilesRepository.log.isTraceEnabled();
+
+ // Used to debug the consistency of the journal ordering.
+ // This is meant to be false as these extra checks would cause performance issues
+ private static final boolean CHECK_CONSISTENCE = false;
// This method exists just to make debug easier.
// I could replace log.trace by log.info temporarily while I was debugging
@@ -56,6 +60,8 @@
// Attributes ----------------------------------------------------
private final SequentialFileFactory fileFactory;
+
+ private final JournalImpl journal;
private final BlockingDeque<JournalFile> dataFiles = new LinkedBlockingDeque<JournalFile>();
@@ -78,12 +84,30 @@
private final int userVersion;
private Executor openFilesExecutor;
+
+ private Runnable pushOpenRunnable = new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ pushOpenedFile();
+ }
+ catch (Exception e)
+ {
+ JournalFilesRepository.log.error(e.getMessage(), e);
+ }
+ }
+ };
+
+
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
-
+
public JournalFilesRepository(final SequentialFileFactory fileFactory,
+ final JournalImpl journal,
final String filePrefix,
final String fileExtension,
final int userVersion,
@@ -98,6 +122,7 @@
this.minFiles = minFiles;
this.fileSize = fileSize;
this.userVersion = userVersion;
+ this.journal = journal;
}
// Public --------------------------------------------------------
@@ -233,11 +258,96 @@
public void addDataFileOnTop(final JournalFile file)
{
dataFiles.addFirst(file);
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
+
+ public String debugFiles()
+ {
+ StringBuffer buffer = new StringBuffer();
+
+ buffer.append("**********\nCurrent File = " + journal.getCurrentFile() + "\n");
+ buffer.append("**********\nDataFiles:\n");
+ for (JournalFile file : dataFiles)
+ {
+ buffer.append(file.toString() + "\n");
+ }
+ buffer.append("*********\nFreeFiles:\n");
+ for (JournalFile file : freeFiles)
+ {
+ buffer.append(file.toString() + "\n");
+ }
+ return buffer.toString();
+ }
+
+ public synchronized void checkDataFiles()
+ {
+ long seq = -1;
+ for (JournalFile file : dataFiles)
+ {
+ if (file.getFileID() <= seq)
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("Sequence out of order on journal");
+ System.exit(-1);
+ }
+
+ if (journal.getCurrentFile() != null && journal.getCurrentFile().getFileID() <= file.getFileID())
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("CurrentFile on the journal is <= the sequence file.getFileID=" + file.getFileID() + " on the dataFiles");
+ log.info("Currentfile.getFileId=" + journal.getCurrentFile().getFileID() + " while the file.getFileID()=" + file.getFileID());
+ log.info("IsSame = (" + (journal.getCurrentFile() == file) + ")");
+
+ // throw new RuntimeException ("Check failure!");
+ }
+
+ if (journal.getCurrentFile() == file)
+ {
+ throw new RuntimeException ("Check failure! Current file listed as data file!");
+ }
+
+ seq = file.getFileID();
+ }
+
+ long lastFreeId = -1;
+ for (JournalFile file : freeFiles)
+ {
+ if (file.getFileID() <= lastFreeId)
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("FreeFileID out of order ");
+
+ throw new RuntimeException ("Check failure!");
+ }
+
+ lastFreeId= file.getFileID();
+
+ if (file.getFileID() < seq)
+ {
+ log.info("CheckDataFiles:");
+ log.info(debugFiles());
+ log.info("A FreeFile is less then the maximum data");
+
+ // throw new RuntimeException ("Check failure!");
+ }
+ }
+ }
public void addDataFileOnBottom(final JournalFile file)
{
dataFiles.add(file);
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
// Free File Operations ==========================================
@@ -254,6 +364,11 @@
public void addFreeFileNoInit(final JournalFile file)
{
freeFiles.add(file);
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
/**
@@ -302,6 +417,11 @@
{
file.getFile().delete();
}
+
+ if (CHECK_CONSISTENCE)
+ {
+ checkDataFiles();
+ }
}
public Collection<JournalFile> getFreeFiles()
@@ -333,28 +453,13 @@
JournalFilesRepository.trace("enqueueOpenFile with openedFiles.size=" + openedFiles.size());
}
- Runnable run = new Runnable()
- {
- public void run()
- {
- try
- {
- pushOpenedFile();
- }
- catch (Exception e)
- {
- JournalFilesRepository.log.error(e.getMessage(), e);
- }
- }
- };
-
if (openFilesExecutor == null)
{
- run.run();
+ pushOpenRunnable.run();
}
else
{
- openFilesExecutor.execute(run);
+ openFilesExecutor.execute(pushOpenRunnable);
}
JournalFile nextFile = null;
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2011-06-06 03:12:33 UTC (rev 10774)
@@ -292,6 +292,7 @@
this.fileFactory = fileFactory;
filesRepository = new JournalFilesRepository(fileFactory,
+ this,
filePrefix,
fileExtension,
userVersion,
@@ -1484,7 +1485,7 @@
public synchronized JournalLoadInformation load(final List<RecordInfo> committedRecords,
final List<PreparedTransactionInfo> preparedTransactions,
final TransactionFailureCallback failureCallback,
- final boolean fixBadTX) throws Exception
+ final boolean changeData) throws Exception
{
final Set<Long> recordsToDelete = new HashSet<Long>();
// ArrayList was taking too long to delete elements on checkDeleteSize
@@ -1554,7 +1555,7 @@
failureCallback.failedTransaction(transactionID, records, recordsToDelete);
}
}
- }, fixBadTX);
+ }, changeData);
for (RecordInfo record : records)
{
@@ -1640,16 +1641,8 @@
try
{
- if (JournalImpl.trace)
- {
- JournalImpl.trace("Starting compacting operation on journal");
- }
+ log.debug("Starting compacting operation on journal");
- if (JournalImpl.TRACE_RECORDS)
- {
- JournalImpl.traceRecord("Starting compacting operation on journal");
- }
-
onCompactStart();
// We need to guarantee that the journal is frozen for this short time
@@ -1806,16 +1799,8 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- if (JournalImpl.trace)
- {
- trace("Finished compacting on journal");
- }
-
- if (JournalImpl.TRACE_RECORDS)
- {
- JournalImpl.traceRecord("Finished compacting on journal");
- }
-
+ log.debug("Finished compacting on journal");
+
}
finally
{
@@ -1879,7 +1864,7 @@
return load(loadManager, true);
}
- public synchronized JournalLoadInformation load(final LoaderCallback loadManager, boolean fixFailingTransactions) throws Exception
+ public synchronized JournalLoadInformation load(final LoaderCallback loadManager, final boolean changeData) throws Exception
{
if (state != JournalImpl.STATE_STARTED)
{
@@ -2167,8 +2152,11 @@
}
else
{
- // Empty dataFiles with no data
- filesRepository.addFreeFileNoInit(file);
+ if (changeData)
+ {
+ // Empty dataFiles with no data
+ filesRepository.addFreeFile(file, false);
+ }
}
}
@@ -2206,7 +2194,7 @@
JournalImpl.log.warn("Uncommitted transaction with id " + transaction.transactionID +
" found and discarded");
- if (fixFailingTransactions)
+ if (changeData)
{
// I append a rollback record here, because otherwise compacting will be throwing messages because of unknown transactions
this.appendRollbackRecord(transaction.transactionID, false);
@@ -2998,7 +2986,7 @@
if (JournalImpl.trace)
{
- JournalImpl.trace("moveNextFile: " + currentFile);
+ log.trace("moveNextFile: " + currentFile);
}
fileFactory.activateBuffer(currentFile.getFile());
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2011-06-06 03:12:33 UTC (rev 10774)
@@ -910,11 +910,13 @@
if (message == null)
{
- throw new IllegalStateException("Cannot find message " + record.id);
+ log.error("Cannot find message " + record.id);
}
+ else
+ {
+ queueMessages.put(messageID, new AddMessageRecord(message));
+ }
- queueMessages.put(messageID, new AddMessageRecord(message));
-
break;
}
case ACKNOWLEDGE_REF:
@@ -929,14 +931,16 @@
if (queueMessages == null)
{
- throw new IllegalStateException("Cannot find queue messages " + encoding.queueID);
+ log.error("Cannot find queue messages for queueID=" + encoding.queueID + " on ack for messageID=" + messageID);
}
-
- AddMessageRecord rec = queueMessages.remove(messageID);
-
- if (rec == null)
+ else
{
- throw new IllegalStateException("Cannot find message " + messageID);
+ AddMessageRecord rec = queueMessages.remove(messageID);
+
+ if (rec == null)
+ {
+ log.error("Cannot find message " + messageID);
+ }
}
break;
@@ -1008,18 +1012,23 @@
if (queueMessages == null)
{
- throw new IllegalStateException("Cannot find queue messages " + encoding.queueID);
+ log.error("Cannot find queue messages " + encoding.queueID + " for message " + messageID + " while processing scheduled messages");
}
-
- AddMessageRecord rec = queueMessages.get(messageID);
-
- if (rec == null)
+ else
{
- throw new IllegalStateException("Cannot find message " + messageID);
+
+ AddMessageRecord rec = queueMessages.get(messageID);
+
+ if (rec == null)
+ {
+ log.error("Cannot find message " + messageID);
+ }
+ else
+ {
+ rec.scheduledDeliveryTime = encoding.scheduledDeliveryTime;
+ }
}
- rec.scheduledDeliveryTime = encoding.scheduledDeliveryTime;
-
break;
}
case DUPLICATE_ID:
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2011-06-06 03:12:33 UTC (rev 10774)
@@ -3677,11 +3677,24 @@
}
}
+ public void testLoop() throws Exception
+ {
+ for (int i = 0 ; i < 1000; i++)
+ {
+ log.warn("#test " + i);
+ testDLAOnLargeMessageAndPaging();
+ tearDown();
+ setUp();
+ }
+
+ }
+
public void testDLAOnLargeMessageAndPaging() throws Exception
{
clearData();
Configuration config = createDefaultConfig();
+ config.setThreadPoolMaxSize(5);
config.setJournalSyncNonTransactional(false);
@@ -3776,12 +3789,14 @@
assertNotNull("Message " + i + " wasn't received", message);
message.acknowledge();
+ final AtomicInteger bytesOutput = new AtomicInteger(0);
+
message.setOutputStream(new OutputStream()
{
@Override
public void write(int b) throws IOException
{
-
+ bytesOutput.incrementAndGet();
}
});
@@ -3795,8 +3810,10 @@
}
catch (Throwable e)
{
+ log.info("output bytes = " + bytesOutput);
log.info(threadDump("dump"));
- fail("Couldn't finish large message receiving for id=" + message.getStringProperty("id") + " with messageID=" + message.getMessageID());
+ fail("Couldn't finish large message receiving for id=" +
+ message.getStringProperty("id") + " with messageID=" + message.getMessageID());
}
}
Modified: branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java
===================================================================
--- branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-06-03 15:21:19 UTC (rev 10773)
+++ branches/Branch_2_2_EAP/tests/src/org/hornetq/tests/integration/journal/NIOJournalCompactTest.java 2011-06-06 03:12:33 UTC (rev 10774)
@@ -779,8 +779,10 @@
}
}
}
+
+ long lastId = idGenerator.generateID();
- add(idGenerator.generateID());
+ add(lastId);
if (createControlFile && deleteControlFile && renameFilesAfterCompacting)
{
@@ -791,6 +793,15 @@
createJournal();
startJournal();
loadAndCheck();
+
+ journal.forceMoveNextFile();
+ update(lastId);
+
+ stopJournal();
+ createJournal();
+ startJournal();
+ loadAndCheck();
+
}
13 years, 6 months
JBoss hornetq SVN: r10773 - branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2011-06-03 11:21:19 -0400 (Fri, 03 Jun 2011)
New Revision: 10773
Modified:
branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
Log:
duh!
Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2011-06-03 13:53:02 UTC (rev 10772)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/jms/client/HornetQMessageProducer.java 2011-06-03 15:21:19 UTC (rev 10773)
@@ -314,7 +314,7 @@
private void doSend(final Message message, final long timeToLive, HornetQDestination destination) throws JMSException
{
- if (message.getJMSExpiration() != 0)
+ if (message.getJMSExpiration() == 0)
{
if (timeToLive == 0)
{
13 years, 6 months
JBoss hornetq SVN: r10772 - in branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq: utils and 1 other directory.
by do-not-reply@jboss.org
Author: gaohoward
Date: 2011-06-03 09:53:02 -0400 (Fri, 03 Jun 2011)
New Revision: 10772
Added:
branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/utils/ClassloadingUtil.java
Modified:
branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
Log:
HORNETQ-681
Modified: branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java
===================================================================
--- branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-03 08:14:56 UTC (rev 10771)
+++ branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/core/client/impl/ClientSessionFactoryImpl.java 2011-06-03 13:53:02 UTC (rev 10772)
@@ -14,8 +14,6 @@
package org.hornetq.core.client.impl;
import java.lang.ref.WeakReference;
-import java.security.AccessController;
-import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
@@ -58,6 +56,7 @@
import org.hornetq.spi.core.remoting.ConnectionLifeCycleListener;
import org.hornetq.spi.core.remoting.Connector;
import org.hornetq.spi.core.remoting.ConnectorFactory;
+import org.hornetq.utils.ClassloadingUtil;
import org.hornetq.utils.ConcurrentHashSet;
import org.hornetq.utils.ConfigurationHelper;
import org.hornetq.utils.ExecutorFactory;
@@ -202,10 +201,10 @@
closeExecutor = orderedExecutorFactory.getExecutor();
this.interceptors = interceptors;
-
+
}
- public void connect(int initialConnectAttempts, boolean failoverOnInitialConnection) throws HornetQException
+ public void connect(final int initialConnectAttempts, final boolean failoverOnInitialConnection) throws HornetQException
{
// Get the connection
getConnectionWithRetry(initialConnectAttempts);
@@ -228,7 +227,7 @@
return connectorConfig;
}
- public void setBackupConnector(TransportConfiguration live, TransportConfiguration backUp)
+ public void setBackupConnector(final TransportConfiguration live, final TransportConfiguration backUp)
{
if(live.equals(connectorConfig) && backUp != null)
{
@@ -242,7 +241,7 @@
{
if (log.isDebugEnabled())
{
- log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + " / " + backUp + " but it didn't belong to " + this.connectorConfig);
+ log.debug("ClientSessionFactoryImpl received backup update for live/backup pair = " + live + " / " + backUp + " but it didn't belong to " + connectorConfig);
}
}
}
@@ -370,14 +369,14 @@
// Must be synchronized to prevent it happening concurrently with failover which can lead to
// inconsistencies
- public void removeSession(final ClientSessionInternal session, boolean failingOver)
+ public void removeSession(final ClientSessionInternal session, final boolean failingOver)
{
synchronized (sessions)
{
sessions.remove(session);
}
}
-
+
public void connectionReadyForWrites(final Object connectionID, final boolean ready)
{
}
@@ -461,7 +460,7 @@
{
stopPingingAfterOne = true;
}
-
+
public void resumePinging()
{
stopPingingAfterOne = false;
@@ -804,7 +803,7 @@
throw new IllegalStateException("Oh my God it's full of stars!");
}
- private void callFailureListeners(final HornetQException me, final boolean afterReconnect, boolean failedOver)
+ private void callFailureListeners(final HornetQException me, final boolean afterReconnect, final boolean failedOver)
{
final List<SessionFailureListener> listenersClone = new ArrayList<SessionFailureListener>(listeners);
@@ -866,7 +865,7 @@
{
sessionsToFailover = new HashSet<ClientSessionInternal>(sessions);
}
-
+
for (ClientSessionInternal session : sessionsToFailover)
{
session.handleFailover(connection);
@@ -902,7 +901,7 @@
if (reconnectAttempts != 0)
{
count++;
-
+
if (reconnectAttempts != -1 && count == reconnectAttempts)
{
log.warn("Tried " + reconnectAttempts + " times to connect. Now giving up on reconnecting it.");
@@ -1011,7 +1010,7 @@
{
log.debug("Trying to connect at the main server using connector :" + connectorConfig);
}
-
+
tc = connector.createConnection();
if (tc == null)
@@ -1020,7 +1019,7 @@
{
log.debug("Main server is not up. Hopefully there's a backup configured now!");
}
-
+
try
{
connector.close();
@@ -1058,7 +1057,7 @@
{
log.debug("Backup is not active yet");
}
-
+
try
{
connector.close();
@@ -1072,12 +1071,12 @@
else
{
/*looks like the backup is now live, lets use that*/
-
+
if (log.isDebugEnabled())
{
log.debug("Connected to the backup at " + backupConfig);
}
-
+
connectorConfig = backupConfig;
backupConfig = null;
@@ -1171,6 +1170,7 @@
return connection;
}
+ @Override
public void finalize() throws Throwable
{
if (!closed)
@@ -1188,24 +1188,7 @@
private ConnectorFactory instantiateConnectorFactory(final String connectorFactoryClassName)
{
- return AccessController.doPrivileged(new PrivilegedAction<ConnectorFactory>()
- {
- public ConnectorFactory run()
- {
- ClassLoader loader = Thread.currentThread().getContextClassLoader();
- try
- {
- Class<?> clazz = loader.loadClass(connectorFactoryClassName);
- return (ConnectorFactory)clazz.newInstance();
- }
- catch (Exception e)
- {
- throw new IllegalArgumentException("Error instantiating connector factory \"" + connectorFactoryClassName +
- "\"",
- e);
- }
- }
- });
+ return (ConnectorFactory)ClassloadingUtil.safeInitNewInstance(connectorFactoryClassName);
}
private void lockChannel1()
@@ -1262,7 +1245,7 @@
if (type == PacketImpl.DISCONNECT)
{
final DisconnectMessage msg = (DisconnectMessage)packet;
-
+
closeExecutor.execute(new Runnable()
{
// Must be executed on new thread since cannot block the netty thread for a long time and fail can
@@ -1329,7 +1312,7 @@
this.connectionID = connectionID;
}
- public void connectionFailed(final HornetQException me, boolean failedOver)
+ public void connectionFailed(final HornetQException me, final boolean failedOver)
{
handleConnectionFailure(connectionID, me);
}
@@ -1374,7 +1357,7 @@
first = false;
long now = System.currentTimeMillis();
-
+
if (clientFailureCheckPeriod != -1 && connectionTTL != -1 && now >= lastCheck + connectionTTL )
{
if (!connection.checkDataReceived())
@@ -1405,7 +1388,7 @@
}
/**
- *
+ *
*/
public void send()
{
Added: branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/utils/ClassloadingUtil.java
===================================================================
--- branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/utils/ClassloadingUtil.java (rev 0)
+++ branches/HORNETQ-681/hornetq-core/src/main/java/org/hornetq/utils/ClassloadingUtil.java 2011-06-03 13:53:02 UTC (rev 10772)
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2010 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.utils;
+
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+
+/**
+ * A ClassloadingUtil
+ *
+ * @author <a href="mailto:hgao@redhat.com">Howard Gao</a>
+ */
+public final class ClassloadingUtil
+{
+ public static Object safeInitNewInstance(final String className)
+ {
+ return AccessController.doPrivileged(new PrivilegedAction<Object>()
+ {
+ public Object run()
+ {
+ ClassLoader loader = ClassloadingUtil.class.getClassLoader();
+ if (loader == null)
+ {
+ loader = Thread.currentThread().getContextClassLoader();
+ }
+
+ try
+ {
+ Class<?> clazz = loader.loadClass(className);
+ return clazz.newInstance();
+ }
+ catch (Exception e)
+ {
+ throw new IllegalArgumentException("Error instantiating connector factory \"" + className + "\"", e);
+ }
+ }
+ });
+ }
+
+}
13 years, 6 months