[jboss-user] [JBoss Messaging] - High volume usage patterns and best practices

xxbrandonoxx do-not-reply at jboss.com
Wed Sep 17 16:53:39 EDT 2008


I am trying to implement a system that will require consistently high volume with long running and constantly connected clients.  To date I have followed the samples included with JBoss Messaging and am creating a new connection+session+temp_response_queue on a per request basis.  It was suggested that some of this is actually an anti-pattern.

This actually works fairly well and sufficiently fast on my local development environment aka, laptop, but breaks down when I promote it to a clustered development environment that is otherwise identical.  I fairly quickly run out of memory, and get a 25+% failure rate even at low volumes.  I am inclined to believe it is in part due to the way in which I am accessing the queue's and topics.

I would like advice on the best way to handle the "pooling" of some of these objects (connections and sessions) for reuse.  Is this advisable at all?  If so, is there solution examples anyone is aware of?  I would hate to have to create my own pooling logic with all the inherent issues in high volume systems.

One example of where I do not see the feasibility of this is in the connection object.  I do not see any exposed way in the API to "test" the connectedness of the connection objects.  I would assume in any pooling situation I would have to make sure it is still alive, etc.

Any help would be appreciated.

Thanks,
  Brandon
=======================================================
Solution Descrpition
Client: A simple one form one page JSF app
Server: A fairly simple MDB.  It currently makes an HTTP request and echo's the response to a client created temporary response queue.  It also forwards a copy to an "observer" topic for further SOA visibility.


  | public String executeSynchronous(String resourceUrl, HTTPMethod method, String payload, final long timeout) {
  | 		JMSUtil jmsUtil = new JMSUtil();
  | 		if (timeout > MAX_TIMEOUT) {
  | 			return "Error: timeout provided exeeds MAX_TIMEOUT(" + MAX_TIMEOUT + ")";
  | 		}
  | 
  | 		Map<String, String> headers = new HashMap<String, String>();
  | 		headers.put("Method", method.name());
  | 		headers.put("URL", resourceUrl);
  | 
  | 		String messageId = JMSUtil.generateUUID();
  | 		Session session = jmsUtil.generateSession();
  | 		Queue responseQueue = jmsUtil.generateResponseQueue(session);
  | 		jmsUtil.sendSOAQueueMessage(session, payload, jndiReferenceSOAIn, headers, messageId, responseQueue);
  | 		String response = jmsUtil.getSOAResponse(session, responseQueue, timeout);
  | 		jmsUtil.closeSession(session);
  | 		
  | 		return response;
  | 	}
  | 
calls...

  | public void sendSOAQueueMessage(Session session, String payload, String jndiReference, Map<String, String> headers, String messageId, Queue responseQueue) {
  | 		try {
  | 			// Create Message
  | 			TextMessage message = session.createTextMessage(payload);
  | 
  | 			// Assign Headers
  | 			for (String headerName : headers.keySet()) {
  | 				message.setStringProperty(headerName, headers.get(headerName));
  | 			}
  | 
  | 			// Set UUIDreturn null;
  | 			message.setStringProperty("EntertainmentSOARequestId", messageId);
  | 			message.setJMSReplyTo(responseQueue);
  | 
  | 			// Create the producer.
  | 
  | 			MessageProducer sender = session.createProducer(getSOAInQueue());
  | 			sender.send(message);
  | 			sender.close();
  | 
  | 			// Commit if needed
  | 			if (transacted) {
  | 				session.commit();
  | 			}
  | 		} catch (JMSException e) {
  | 			logger.error("A JMS Exception occurred in sending a JMS SOA message!", e);
  | 		}
  | 	}
  | 

Supported by the following util methods...

  | rotected static Connection getConnection() {
  | 		if (connection == null) {
  | 			Hashtable properties = new Hashtable();
  | 			properties.put(Context.INITIAL_CONTEXT_FACTORY, "org.jnp.interfaces.NamingContextFactory");
  | 			properties.put(Context.URL_PKG_PREFIXES, "org.jboss.naming:org.jnp.interfaces");
  | 			properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  | 			properties.put(Context.SECURITY_PRINCIPAL, "not_real");
  | 			properties.put(Context.SECURITY_CREDENTIALS, "not_real");
  | 
  | 			Context context;
  | 			try {
  | 				context = new InitialContext(properties);
  | 				connectionFactory = (ConnectionFactory) context.lookup("ConnectionFactory");
  | 			} catch (NamingException e) {
  | 
  | 				e.printStackTrace();
  | 				return null;
  | 			}
  | 
  | 			try {
  | 				connection = connectionFactory.createConnection();
  | 				connection.start();
  | 			} catch (JMSException e) {
  | 				return null;
  | 			}
  | 		}
  | 		return connection;
  | 	}
  | public Queue generateResponseQueue(Session session) {
  | 		try {
  | 			return session.createTemporaryQueue();
  | 		} catch (JMSException e) {
  | 			logger.error("Could not generate temporary queue for receiving response", e);
  | 			return null;
  | 		}
  | 	}
  | public Session generateSession() {
  | 		try {
  | 			// Get a connection
  | 			Connection connection = getConnection();
  | 
  | 			// Create the session
  | 			return connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
  | 		} catch (JMSException e) {
  | 			logger.error("Could not generate session", e);
  | 			return null;
  | 		}
  | 	}
  | public String getSOAResponse(Session session, Destination responseQueue, long timeout) {
  | 		try {
  | 			// Create the consumer
  | 			MessageConsumer consumer = session.createConsumer(responseQueue);
  | 			Message message = consumer.receive(timeout);
  | 			message.acknowledge();
  | 			consumer.close();
  | 
  | 			if (message != null && message instanceof TextMessage) {
  | 				return ((TextMessage) message).getText();
  | 			} else {
  | 				logger.error("Recieved message was not of type TextMessage!");
  | 				return "Recieved message was not of type TextMessage!";
  | 			}
  | 
  | 		} catch (JMSException e) {
  | 			logger.error("A JMS Exception occurred in receiving a JMS SOA message!", e);
  | 			return "A JMS Exception occurred in receiving a JMS SOA message! " + e.getMessage();
  | 		}
  | 	}
  | 

MDB


  | @MessageDriven(activationConfig = { @ActivationConfigProperty(propertyName = "destinationType", propertyValue = "javax.jms.Queue"),
  | 		@ActivationConfigProperty(propertyName = "destination", propertyValue = "queue/SOAInQueue"),
  | 		@ActivationConfigProperty(propertyName = "acknowledgeMode", propertyValue = "Auto-acknowledge") })
  | public class SOARouterMDB implements MessageListener {
  | 
  | 	private final RESTAccessor restAccessor = new RESTAccessor();
  | 
  | 	private static Logger logger = Logger.getLogger(SOARouterMDB.class);
  | 
  | 	public void onMessage(Message msg) {
  | 		try {
  | 			Responder responder = new Responder();
  | 			TextMessage tmsg = (TextMessage) msg;
  | 
  | 			HTTPMethod method = HTTPMethod.valueOf(tmsg.getStringProperty("Method"));
  | 			String url = tmsg.getStringProperty("URL");
  | 			Map<String, String> httpHeaders = new HashMap<String, String>();
  | 			Map<String, String> allHeaders = new HashMap<String, String>();
  | 			Destination destination = tmsg.getJMSReplyTo();
  | 			
  | 			System.out.println("Destination: " + destination);
  | 
  | 			// Load HttpHeaders
  | 			Enumeration<String> propertyNames = tmsg.getPropertyNames();
  | 			while (propertyNames.hasMoreElements()) {
  | 				String propertyName = propertyNames.nextElement();
  | 				if (propertyName.startsWith("HttpHeader")) {
  | 					httpHeaders.put(propertyName.substring(10), tmsg.getStringProperty(propertyName));
  | 				}
  | 				allHeaders.put(propertyName, tmsg.getStringProperty(propertyName));
  | 			}
  | 
  | 			HTTPResponse response = null;
  | 
  | 			switch (method) {
  | 			case GET:
  | 				response = restAccessor.doGet(url, httpHeaders);
  | 				if (!response.isError()) {
  | 					responder.respond(destination, allHeaders, response.getData());
  | 					responder.broadcast(allHeaders, response.getData());
  | 				} else {
  | 					responder.reportError(tmsg);
  | 				}
  | 				break;
  | 			case POST:
  | 				response = restAccessor.doPost(url, httpHeaders, new ByteArrayInputStream(tmsg.getText().getBytes()));
  | 				if (!response.isError()) {
  | 					responder.respond(destination, allHeaders, response.getData());
  | 					responder.broadcast(allHeaders, response.getData());
  | 				} else {
  | 					responder.reportError(tmsg);
  | 				}
  | 				break;
  | 			case DELETE:
  | 				response = restAccessor.doDelete(url, httpHeaders, tmsg.getText());
  | 				if (!response.isError()) {
  | 					responder.respond(destination, allHeaders, response.getData());
  | 					responder.broadcast(allHeaders, response.getData());
  | 				} else {
  | 					responder.reportError(tmsg);
  | 				}
  | 				break;
  | 			case PUT:
  | 				response = restAccessor.doPost(url, httpHeaders, new ByteArrayInputStream(tmsg.getText().getBytes()));
  | 				if (!response.isError()) {
  | 					responder.respond(destination, allHeaders, response.getData());
  | 					responder.broadcast(allHeaders, response.getData());
  | 				} else {
  | 					responder.reportError(tmsg);
  | 				}
  | 				break;
  | 			default:
  | 				// TODO: asdf
  | 			}
  | 		} catch (Exception e) {
  | 			logger.error("Error processing SOA request", e);
  | 		}
  | 	}
  | 
  | 	@PreDestroy
  | 	public void destroy() {
  | 	}
  | 
  | }
  | 

View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4177242#4177242

Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4177242



More information about the jboss-user mailing list