[jboss-cvs] jboss-seam/src/remoting/org/jboss/seam/remoting/messaging ...
Shane Bryzak
sbryzak at redhat.com
Tue Feb 27 17:15:24 EST 2007
User: sbryzak2
Date: 07/02/27 17:15:24
Added: src/remoting/org/jboss/seam/remoting/messaging
JBossConnectionProvider.java
JMSConnectionProvider.java PollError.java
PollRequest.java RemoteSubscriber.java
SubscriptionRegistry.java SubscriptionRequest.java
Log:
JBSEAM-915
Revision Changes Path
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/JBossConnectionProvider.java
Index: JBossConnectionProvider.java
===================================================================
package org.jboss.seam.remoting.messaging;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.naming.InitialContext;
/**
*
* @author Shane Bryzak
*/
public class JBossConnectionProvider implements JMSConnectionProvider
{
private static final String FACTORY_JNDI_NAME = "UIL2ConnectionFactory";
public TopicConnection createConnection()
throws Exception
{
InitialContext ctx = new InitialContext();
TopicConnectionFactory f = (TopicConnectionFactory) ctx.lookup(FACTORY_JNDI_NAME);
return f.createTopicConnection();
}
}
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/JMSConnectionProvider.java
Index: JMSConnectionProvider.java
===================================================================
package org.jboss.seam.remoting.messaging;
import javax.jms.TopicConnection;
/**
*
* @author Shane Bryzak
*/
public interface JMSConnectionProvider {
public TopicConnection createConnection() throws Exception;
}
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/PollError.java
Index: PollError.java
===================================================================
package org.jboss.seam.remoting.messaging;
/**
*
* @author Shane Bryzak
*/
public class PollError
{
public static final String ERROR_CODE_TOKEN_NOT_FOUND = "TOKEN_NOT_FOUND";
public static final String ERROR_CODE_JMS_EXCEPTION = "JMS_EXCEPTION";
private String code;
private String message;
public PollError(String code, String message)
{
this.code = code;
this.message = message;
}
public String getCode()
{
return code;
}
public String getMessage()
{
return message;
}
}
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/PollRequest.java
Index: PollRequest.java
===================================================================
package org.jboss.seam.remoting.messaging;
import java.util.ArrayList;
import java.util.List;
import javax.jms.Message;
import javax.jms.*;
/**
* Wrapper for a single request for a specified subscription poll.
*
* @author Shane Bryzak
*/
public class PollRequest
{
private String token;
private int timeout;
private List<Message> messages;
private List<PollError> errors = new ArrayList<PollError>();
public PollRequest(String token, int timeout)
{
this.token = token;
this.timeout = timeout;
}
public String getToken()
{
return token;
}
public List<Message> getMessages()
{
return messages;
}
public List<PollError> getErrors()
{
return errors;
}
public void poll()
{
RemoteSubscriber subscriber = SubscriptionRegistry.instance().getSubscription(token);
if (subscriber != null)
{
try
{
messages = subscriber.poll(timeout);
}
catch (JMSException ex)
{
errors.add(new PollError(PollError.ERROR_CODE_JMS_EXCEPTION,
"Error polling for messages"));
}
}
else
errors.add(new PollError(PollError.ERROR_CODE_TOKEN_NOT_FOUND,
"No subscription was found for the specified token."));
}
}
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/RemoteSubscriber.java
Index: RemoteSubscriber.java
===================================================================
package org.jboss.seam.remoting.messaging;
import java.util.ArrayList;
import java.util.List;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicSession;
import javax.jms.TopicSubscriber;
/**
*
* @author Shane Bryzak
*/
public class RemoteSubscriber
{
private String token;
private String topicName;
private Topic topic;
private TopicSession topicSession;
private TopicSubscriber subscriber;
public RemoteSubscriber(String token, String topicName)
{
this.token = token;
this.topicName = topicName;
}
public String getToken()
{
return token;
}
public String getTopicName()
{
return topicName;
}
public void subscribe(TopicConnection conn)
throws JMSException
{
topicSession = conn.createTopicSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);
topic = topicSession.createTopic(topicName);
subscriber = topicSession.createSubscriber(topic);
}
public void unsubscribe()
{
try {
subscriber.close();
// Remove the subscription's token from the user's session context
SubscriptionRegistry.instance().getUserTokens().remove(token);
}
catch (JMSException ex) { }
try {
topicSession.close();
}
catch (JMSException ex) { }
}
public void setTopicSubscriber(TopicSubscriber subscriber)
{
this.subscriber = subscriber;
}
public TopicSubscriber getTopicSubscriber()
{
return subscriber;
}
public List<Message> poll(int timeout)
throws JMSException
{
List<Message> messages = null;
Message m = null;
synchronized(subscriber)
{
do {
// Only timeout for the first message.. subsequent messages should be nowait
if (messages == null && timeout > 0)
m = subscriber.receive(timeout * 1000);
else
m = subscriber.receiveNoWait();
if (m != null) {
if (messages == null)
messages = new ArrayList<Message> ();
messages.add(m);
}
}
while (m != null);
}
return messages;
}
}
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/SubscriptionRegistry.java
Index: SubscriptionRegistry.java
===================================================================
package org.jboss.seam.remoting.messaging;
import static org.jboss.seam.InterceptionType.NEVER;
import static org.jboss.seam.annotations.Install.BUILT_IN;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.TopicConnection;
import org.jboss.seam.log.LogProvider;
import org.jboss.seam.log.Logging;
import org.jboss.seam.Component;
import org.jboss.seam.ScopeType;
import org.jboss.seam.annotations.Install;
import org.jboss.seam.annotations.Intercept;
import org.jboss.seam.annotations.Name;
import org.jboss.seam.annotations.Scope;
import org.jboss.seam.contexts.Context;
import org.jboss.seam.contexts.Contexts;
import org.jboss.seam.util.Reflections;
/**
*
* @author Shane Bryzak
*/
@Scope(ScopeType.APPLICATION)
@Intercept(NEVER)
@Name("org.jboss.seam.remoting.messaging.subscriptionRegistry")
@Install(value = false, precedence=BUILT_IN)
public class SubscriptionRegistry
{
private static final String DEFAULT_CONNECTION_PROVIDER =
"org.jboss.seam.remoting.messaging.JBossConnectionProvider";
public static final String CONTEXT_USER_TOKENS =
"org.jboss.seam.remoting.messaging.SubscriptionRegistry.userTokens";
private static final LogProvider log = Logging.getLogProvider(SubscriptionRegistry.class);
private String connectionProvider;
private volatile TopicConnection topicConnection;
private Object monitor = new Object();
private Map<String,RemoteSubscriber> subscriptions = new HashMap<String,RemoteSubscriber>();
/**
* Contains a list of all the topics that clients are allowed to subscribe to.
*/
private Set<String> allowedTopics = new HashSet<String>();
public static SubscriptionRegistry instance()
{
SubscriptionRegistry registry = (SubscriptionRegistry) Component.getInstance(SubscriptionRegistry.class, true);
if (registry == null)
{
throw new IllegalStateException("No SubscriptionRegistry exists");
}
return registry;
}
public Set<String> getAllowedTopics()
{
return allowedTopics;
}
public void setAllowedTopics(Set<String> allowedTopics)
{
this.allowedTopics = allowedTopics;
}
public String getConnectionProvider()
{
return connectionProvider;
}
public void setConnectionProvider(String connectionProvider)
{
this.connectionProvider = connectionProvider;
}
private TopicConnection getTopicConnection()
throws Exception
{
if (topicConnection == null)
{
synchronized(monitor)
{
if (topicConnection == null)
{
String providerName = connectionProvider != null ?
connectionProvider : DEFAULT_CONNECTION_PROVIDER;
try {
Class providerClass = Reflections.classForName(providerName);
JMSConnectionProvider provider = (JMSConnectionProvider) providerClass.newInstance();
topicConnection = provider.createConnection();
topicConnection.setExceptionListener(new ExceptionListener() {
public void onException(JMSException ex)
{
// swallow the exception for now - do we need to try and reconnect???
}
});
topicConnection.start();
}
catch (ClassNotFoundException ex)
{
log.error(String.format("Topic connection provider class [%s] not found",
providerName));
throw ex;
}
catch (InstantiationException ex)
{
log.error(String.format("Failed to create connection provider [%s]",
providerName));
throw ex;
}
}
}
}
return topicConnection;
}
public RemoteSubscriber subscribe(String topicName)
{
if (!allowedTopics.contains(topicName))
throw new IllegalArgumentException(String.format(
"Cannot subscribe to a topic that is not allowed. Topic [%s] is not an " +
"allowed topic.", topicName));
RemoteSubscriber sub = new RemoteSubscriber(UUID.randomUUID().toString(), topicName);
try {
sub.subscribe(getTopicConnection());
subscriptions.put(sub.getToken(), sub);
// Save the client's token in their session context
getUserTokens().add(sub.getToken());
return sub;
}
catch (Exception ex) {
log.error(ex);
return null;
}
}
/**
*
* @return Set
*/
public Set getUserTokens()
{
Context session = Contexts.getSessionContext();
if (session.get(CONTEXT_USER_TOKENS) == null)
{
synchronized(session)
{
if (session.get(CONTEXT_USER_TOKENS) == null)
session.set(CONTEXT_USER_TOKENS, new HashSet<String> ());
}
}
return (Set) session.get(CONTEXT_USER_TOKENS);
}
public RemoteSubscriber getSubscription(String token)
{
if (!getUserTokens().contains(token))
throw new IllegalArgumentException(
"Invalid token argument - token not found in Session Context.");
return subscriptions.get(token);
}
}
1.1 date: 2007/02/27 22:15:24; author: sbryzak2; state: Exp;jboss-seam/src/remoting/org/jboss/seam/remoting/messaging/SubscriptionRequest.java
Index: SubscriptionRequest.java
===================================================================
package org.jboss.seam.remoting.messaging;
import java.io.IOException;
import java.io.OutputStream;
/**
*
* @author Shane Bryzak
*/
public class SubscriptionRequest
{
private String topicName;
private RemoteSubscriber subscriber;
public SubscriptionRequest(String topicName)
{
this.topicName = topicName;
}
public void subscribe()
{
subscriber = SubscriptionRegistry.instance().subscribe(topicName);
}
public void marshal(OutputStream out)
throws IOException
{
out.write("<subscription topic=\"".getBytes());
out.write(topicName.getBytes());
out.write("\" token=\"".getBytes());
out.write(subscriber.getToken().getBytes());
out.write("\"/>".getBytes());
out.flush();
}
}
More information about the jboss-cvs-commits
mailing list