How to implement Streaming server like The Bayeux Protocol

sumitatgharkikhoj mailtosumitsingh at gmail.com
Mon Nov 23 23:02:47 EST 2009


First of all thank you for the nice framework, forum and documentation.
My problem is i am trying to write a http server with small footprint ( will
go as a component in bigger framework) but it needs to have streaming
support .
Different clients will connect at different times ranging from 100-1000
clients max and this server will be used for both streaming and short data.
the data is streamed using XHR multipart request and continously (24/7) . 
But when i used the example code (please see below) the server blocks after
first thread.
Other option that i tried was jetty with continuation or cometd but i am
more inclined about netty (since i want to replace my mina udp layer with
mina and maintain one single library ).

Any help will be appreciated, I have my machine available via webex to debug
as well.


my server gets block at second request in the following code :

1) in my http server:
    	   ServerBootstrap bootstrap = new ServerBootstrap(new
NioServerSocketChannelFactory(Executors.newCachedThreadPool(),Executors.newCachedThreadPool()));
           bootstrap.setPipelineFactory(new
HttpServerPipelineFactory(requestHandler));
          bootstrap.bind(new InetSocketAddress(8080));
2) in my pipelinefactory:
    public ChannelPipeline getPipeline() throws Exception {
        ChannelPipeline pipeline = pipeline();
        pipeline.addLast("decoder", new HttpRequestDecoder());
        pipeline.addLast("encoder", new HttpResponseEncoder());
        pipeline.addLast("chunkedWriter", new ChunkedWriteHandler());
        pipeline.addLast("handler", requestHandler);
        return pipeline;         
3)my http request handler sends event using following code 
	private void writeResponse(MessageEvent e) {
		ChannelBuffer buf =
ChannelBuffers.copiedBuffer(responseContent.toString(), "UTF-8");
		buf.setBytes(0, responseContent.toString().getBytes() );
		responseContent.setLength(0);
		boolean close =  HttpHeaders.Values.CLOSE.equalsIgnoreCase(request
				.getHeader(HttpHeaders.Names.CONNECTION))
				|| request.getProtocolVersion().equals(HttpVersion.HTTP_1_0)
				&& !HttpHeaders.Values.KEEP_ALIVE.equalsIgnoreCase(request
						.getHeader(HttpHeaders.Names.CONNECTION));

		HttpResponse response = new DefaultHttpResponse(HttpVersion.HTTP_1_1,		
HttpResponseStatus.OK);
		//response.setContent(buf);
		//response.setHeader(HttpHeaders.Names.CONTENT_TYPE, "text/html;
charset=UTF-8");
		response.setHeader(HttpHeaders.Names.CONTENT_TYPE,
"multipart/x-mixed-replace;boundary=XXoXoX");
		//if (!close) 
		{
			response.setHeader("Connection", "Keep-Alive");
			response.setHeader("Cache-Control", "no-cache");
			response.setHeader("Access-Control-Allow-Origin", "*");
		}

		String cookieString = request.getHeader(HttpHeaders.Names.COOKIE);
		if (cookieString != null) {
			CookieDecoder cookieDecoder = new CookieDecoder();
			Set<Cookie> cookies = cookieDecoder.decode(cookieString);
			if (!cookies.isEmpty()) {
				CookieEncoder cookieEncoder = new CookieEncoder(true);
				for (Cookie cookie : cookies) {
					cookieEncoder.addCookie(cookie);
				}
				response.addHeader(HttpHeaders.Names.SET_COOKIE,
cookieEncoder.encode());
			}
		}
		ChannelFuture future =null;
		future = e.getChannel().write(response);
		String temp ="";
		
		if(future.isSuccess()==false)
			return;
		
		e.getChannel().write(new ChunkedInput() {
			
			int i = 0;
			public Object nextChunk() throws Exception {
				i++;
				String temp ="";
				//temp ="--XXoXoX\r\n";
				temp = "Content-Type: text/plain \r\n\r\n";
				temp += "{\"data\":\""+i+"\"}";
				temp +="\r\n--XXoXoX\r\n";
				ChannelBuffer buf2 = ChannelBuffers.copiedBuffer(temp.toString(),
"UTF-8");
				Thread.sleep(100);
				return buf2;
			}
			
			@Override
			public boolean hasNextChunk() throws Exception {
					return true;
			}
			
			@Override
			public void close() throws Exception {
				// TODO Auto-generated method stub
				
			}
		});
		
	if(close){
		future.addListener(ChannelFutureListener.CLOSE);
	}
	}

-- 
View this message in context: http://n2.nabble.com/How-to-implement-Streaming-server-like-The-Bayeux-Protocol-tp4055722p4055722.html
Sent from the Netty User Group mailing list archive at Nabble.com.


More information about the netty-users mailing list