Title : HornetQ with CORE API result in HTTP Thread Waiting
Product Details :
Jboss 5.1 Native
HornetQ 2.2.5 Final
Issue :
Jboss with HornetQ core api not able to scale and hence degrade the performance of the system as HTTP thread goes into waiting state
Thread Dump :
"http-0.0.0.0-8180-48" daemon prio=10 tid=0x00007fdd6800e000 nid=0x5ddf waiting on condition [0x00007fdd576f4000]
java.lang.Thread.State: WAITING (parking)
at sun.misc.Unsafe.park(Native Method)
- parking to wait for <0x00007fdfc10ab010> (a java.util.concurrent.Semaphore$NonfairSync)
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:156)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(AbstractQueuedSynchronizer.java:811)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.doAcquireSharedInterruptibly(AbstractQueuedSynchronizer.java:969)
at java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireSharedInterruptibly(AbstractQueuedSynchronizer.java:1281)
at java.util.concurrent.Semaphore.acquire(Semaphore.java:441)
at org.hornetq.core.client.impl.ClientProducerCreditsImpl.acquireCredits(ClientProducerCreditsImpl.java:74)
at org.hornetq.core.client.impl.ClientProducerImpl.doSend(ClientProducerImpl.java:305)
at org.hornetq.core.client.impl.ClientProducerImpl.send(ClientProducerImpl.java:135)
at com.demo.services.MyServiceImpl.sendMessage( MyServiceImpl.java:174)
at com.demo.services.MyServiceImpl.placementRequest( MyServiceImpl.java:103)
at sun.reflect.GeneratedMethodAccessor285.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
at java.lang.reflect.Method.invoke(Method.java:597)
at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:180)
at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:96)
at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:167)
at org.apache.cxf.jaxrs.JAXRSInvoker.invoke(JAXRSInvoker.java:94)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:58)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:94)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:262)
- locked <0x00007fe54ee74e80> (a org.apache.cxf.phase.PhaseInterceptorChain)
at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:122)
at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:211)
at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:213)
at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:154)
at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:129)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:187)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:110)
at com.demo.services.MyCXFServlet.doPost( MyCXFServlet.java:33)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:637)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:166)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:290)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.jboss.web.tomcat.filters.ReplyHeaderFilter.doFilter(ReplyHeaderFilter.java:96)
at org.apache.catalina.core.ApplicationFilterChain.internalDoFilter(ApplicationFilterChain.java:235)
at org.apache.catalina.core.ApplicationFilterChain.doFilter(ApplicationFilterChain.java:206)
at org.apache.catalina.core.StandardWrapperValve.invoke(StandardWrapperValve.java:235)
at org.apache.catalina.core.StandardContextValve.invoke(StandardContextValve.java:191)
at org.jboss.web.tomcat.security.SecurityAssociationValve.invoke(SecurityAssociationValve.java:190)
at org.jboss.web.tomcat.security.JaccContextValve.invoke(JaccContextValve.java:92)
at org.jboss.web.tomcat.security.SecurityContextEstablishmentValve.process(SecurityContextEstablishmentValve.java:126)
at org.jboss.web.tomcat.security.SecurityContextEstablishmentValve.invoke(SecurityContextEstablishmentValve.java:70)
at org.apache.catalina.core.StandardHostValve.invoke(StandardHostValve.java:127)
at org.apache.catalina.valves.ErrorReportValve.invoke(ErrorReportValve.java:102)
at org.jboss.web.tomcat.service.jca.CachedConnectionValve.invoke(CachedConnectionValve.java:158)
at org.apache.catalina.core.StandardEngineValve.invoke(StandardEngineValve.java:109)
at org.apache.catalina.connector.CoyoteAdapter.service(CoyoteAdapter.java:330)
at org.apache.coyote.http11.Http11AprProcessor.process(Http11AprProcessor.java:905)
at org.apache.coyote.http11.Http11AprProtocol$Http11ConnectionHandler.process(Http11AprProtocol.java:592)
at org.apache.tomcat.util.net.AprEndpoint$Worker.run(AprEndpoint.java:2036)
at java.lang.Thread.run(Thread.java:662)
Details :
Requirement :
A REST(XmlOverHttp) based web service which respond to request, After responding to request, notification will be generated using the sent response and the notification will be sent to billing system.
Delay of 10-15 min is ok for sending notifications but there has to be guarranty that every response sent should send notification. Because notifications are related to revenue / billing.
Implementation and Observation :
In order to send notification after responding to webservice request, we used JMS. To make notification sending asynchronouts, before returning the response our serviceImpl class is putting response on jms queue,
later the consumer will prepare notification using the response received.
To achieve this initially we started with jboss Messaging using JMS API with persistence
Using this with the session caching we achieve TPS of around 1100
Here we found JMS send is taking time in send so the performance of normal flow getting impacted.
Then we moved to hornetQ , and using JMS API in same JBOSS we almost find the same TPS.
Latter we tried with standalone HorentQ. Here we reached TPS Up to 1300
After that we came to know hornetQ core Api are faster than JMS API.
So we moved our code to HornetQ core API.
But we find that our TPS gets degraded to 200-250 using same.
Code Snippet :
Lifecycle Class which initialize the producer and consumer
public static void init() {
clearCache()
final MyConfiguration MyCfg = MyCfgMgr.getAppCfg();
mySession mySession = null;
try {
TransportConfiguration config = new TransportConfiguration(
NettyConnectorFactory.class.getName());
locator = HornetQClient.createServerLocatorWithoutHA(config);
locator.setBlockOnAcknowledge(false);
locator.setBlockOnDurableSend(false);
myfactory = locator.createSessionFactory();
}catch(HornetQException je){
LOGGER.log("HornetQException ", je);
}catch(Exception je){
LOGGER.log("Exception ", je);
}
for (int i = 0; i < myCfg.getMDBPoolSize(); i++) {
try {
ClientSession session = myfactory.createSession(true , true,1);
session.start();
ClientProducer clientProducer = session.createProducer("myAddress");
mySession = new mySession(session, clientProducer ) ;
}catch(HornetQException je){
LOGGER.log("HornetQException ", je);
}catch(Exception je){
LOGGER.log("Exception ", je);
}
mySessionList.add(mySession);
}
initmyConsumer();
}
public static void initmyConsumer(){
try{
TransportConfiguration config = new TransportConfiguration(NettyConnectorFactory.class.getName());
consumerLocator = HornetQClient.createServerLocatorWithoutHA(config);
consumerLocator.setBlockOnAcknowledge(false);
consumerLocator.setBlockOnDurableSend(false);
hFactory = consumerLocator.createSessionFactory();
consumerSession = hFactory.createSession(true,true,1);
consumerSession.start();
for(int i = 1 ; i < myCfg.getMDBPoolSize() ; i++){
ClientConsumer consumer = consumerSession.createConsumer("myQueue");
consumer.setMessageHandler(new MyMsgConsumer());
consumerList.add(consumer);
}
}catch(HornetQException je){
LOGGER.log("HornetQException ", je);
}catch(Exception je){
LOGGER.log("Exception ", je);
}
}
public static void clearCache()
{
LOGGER.log("clearCache PdnSessionCache : "+pdnSessionList.size());
for(PdnSession pdnSession : pdnSessionList){
try {
pdnSession.getClientProducer().close();
pdnSession.getSession().close();
} catch (HornetQException e) {
LOGGER.log("HornetQException while closing Sessions from cache ", e);
}
}
if (locator != null) {
locator.close();
}
if (pdnfactory != null) {
pdnfactory.close();
}
pdnSessionList.clear();
try {
clearConsumer();
} catch (HornetQException e) {
LOGGER.log("HornetQException while closing ClientSession ", e);
}
}
public static void clearConsumer() throws HornetQException
{
for(ClientConsumer c : consumerList){
c.close();
}
consumerList.clear();
if (consumerLocator != null) {
consumerLocator.close();
}
if (consumer != null) {
consumer.close();
}
if (consumerSession != null) {
consumerSession.close();
}
if (hFactory != null) {
hFactory.close();
}
}
MyMsgConsumer class is message consumer and is responsible for handling messages and sending notifications.
Message Creation and sending :
Class Name : MyServiceImpl.java
private void sendMessage(final String res){
try{
// MySession is wrapper class holding ClientSession and ClientProducer as its members
MySession mySession = MyGlobals.getmyJmsSession();
ClientMessage msg = mySession.getSession().createMessage(org.hornetq.api.core.Message.TEXT_TYPE, true);
msg.getBodyBuffer().writeString(res);
mySession.getClientProducer().send(msg);
MyGlobals.returnmyJmsSession(mySession);
}catch(HornetQException je){
LOGGER.log("HornetQException ", je);
}catch(Exception je){
LOGGER.log("Exception ", je);
}
}