Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Atmosphere streaming problem #145

Open
zsenyeg opened this issue Apr 9, 2017 · 2 comments
Open

Atmosphere streaming problem #145

zsenyeg opened this issue Apr 9, 2017 · 2 comments

Comments

@zsenyeg
Copy link

zsenyeg commented Apr 9, 2017

Hi there!

I have a problem using wasync in my project. Project architecture and configuration:

Server side:
Websphere 8.5 - Atmosphere Framework 2.3.8, protocol STREAMING, casuse WS is not supported by WAS currently. Server side is standard @managedservice

import org.atmosphere.config.service.Disconnect;
import org.atmosphere.config.service.ManagedService;
import org.atmosphere.config.service.Ready;
import org.atmosphere.cpr.AtmosphereResource;
import org.atmosphere.cpr.AtmosphereResourceEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.databind.ObjectMapper;

@ManagedService(path = "/atmosphere/push")
public class AllexaPushService {
    private final Logger LOGGER = LoggerFactory.getLogger(AllexaPushService.class);

    private ObjectMapper objectMapper = new ObjectMapper();

    @Ready
    public void onReady(final AtmosphereResource r) {
	LOGGER.info("Client {} connected.", r.uuid());
	try {
	    r.getBroadcaster().broadcast(objectMapper.writeValueAsString(new ConnectionStateChangeEvent(r.uuid(), true)), r);
	} catch (Exception e) {
	    LOGGER.error(e.getMessage(), e);
	}
    }

    @Disconnect
    public void onDisconnect(AtmosphereResourceEvent event) {
	if (event.isCancelled()) {
	    LOGGER.info("Client {} unexpectedly disconnected", event.getResource().uuid());
	} else if (event.isClosedByClient()) {
	    LOGGER.info("Client {} closed the connection", event.getResource().uuid());
	}
    }
}

web.xml

	<servlet>
        <description>AtmosphereServlet</description>
        <servlet-name>AtmosphereServlet</servlet-name>
        <servlet-class>org.atmosphere.cpr.AtmosphereServlet</servlet-class>
        <init-param>
        	<param-name>org.atmosphere.cpr.broadcaster.maxAsyncWriteThreads</param-name>
        	<param-value>5</param-value>
        </init-param>
        <init-param>
        	<param-name>org.atmosphere.cpr.broadcaster.maxProcessingThreads</param-name>
        	<param-value>2</param-value>
        </init-param>
        <init-param>
		    <param-name>org.atmosphere.disableOnStateEvent</param-name>
    		<param-value>true</param-value>
        </init-param>
 		<init-param>
            <param-name>org.atmosphere.useWebSocket</param-name>
            <param-value>false</param-value>
        </init-param>
        <init-param>
            <param-name>org.atmosphere.cpr.asyncSupport</param-name>
            <param-value>org.atmosphere.container.Servlet30CometSupport</param-value>
        </init-param>
		<init-param>
      		<param-name>org.atmosphere.interceptor.HeartbeatInterceptor.heartbeatFrequencyInSeconds</param-name>
      		<param-value>20</param-value>
   		</init-param>
		<init-param>
      		<param-name>org.atmosphere.cpr.CometSupport.maxInactiveActivity</param-name>
      		<param-value>1800000</param-value>
   		</init-param>
		<init-param>
			<param-name>com.ibm.ws.webcontainer.async-supported</param-name>
			<param-value>true</param-value>
		</init-param>
		<init-param>
			<param-name>org.atmosphere.cpr.AtmosphereInterceptor.disable</param-name>
			<param-value>org.atmosphere.client.TrackMessageSizeInterceptor</param-value>
		</init-param>
		<!-- 
        <init-param>
        	<param-name>org.atmosphere.cpr.objectFactory</param-name>
        	<param-value>org.atmosphere.cdi.CDIObjectFactory</param-value>
        </init-param>
         -->
        <load-on-startup>1</load-on-startup>
        <async-supported>true</async-supported>
    </servlet>

Client side wasync related code:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import org.atmosphere.wasync.ClientFactory;
import org.atmosphere.wasync.Decoder;
import org.atmosphere.wasync.Event;
import org.atmosphere.wasync.Function;
import org.atmosphere.wasync.Request.METHOD;
import org.atmosphere.wasync.Socket;
import org.atmosphere.wasync.impl.AtmosphereClient;
import org.atmosphere.wasync.impl.AtmosphereRequest.AtmosphereRequestBuilder;
import org.atmosphere.wasync.impl.DefaultOptions;
import org.atmosphere.wasync.impl.DefaultOptionsBuilder;
import org.codehaus.jackson.map.ObjectMapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ning.http.client.AsyncHttpClient;

import hu.danubius.allexa.be.business.helper.HttpClientFactory;
import hu.danubius.allexa.be.business.http.AllexaHttpClientBean;
import hu.danubius.allexa.be.dto.websocket.event.ConnectionStateChangeEvent;
import hu.danubius.allexa.util.helper.ObjectMapperFactory;

public class PushServiceTest {
    private static final Logger LOGGER = LoggerFactory.getLogger(PushServiceTest.class);

    private Socket socket;
    private DefaultOptions options;

    public void init() {
	try {
	    AsyncHttpClient asyncHttpClient = HttpClientFactory.createHttpClient(5000, "wAsync/2.0");

	    AtmosphereClient client = ClientFactory.getDefault().newClient(AtmosphereClient.class);
	    final AtmosphereRequestBuilder requestBuilder = client.newRequestBuilder();
	    requestBuilder.method(METHOD.GET);
	    String uri = "...";
	    requestBuilder.uri(uri);
	    requestBuilder.header("Accept", "application/json").header("Accept-Charset", AllexaHttpClientBean.CHARSET);
	    requestBuilder.transport(org.atmosphere.wasync.Request.TRANSPORT.LONG_POLLING);

	    DefaultOptionsBuilder optBuilder = client.newOptionsBuilder();
	    optBuilder.reconnect(true);
	    optBuilder.runtime(asyncHttpClient, false);
	    optBuilder.pauseBeforeReconnectInSeconds(10);
	    optBuilder.reconnectAttempts(-1);
	    this.options = optBuilder.build();
	    requestBuilder.decoder(new Decoder<String, Object>() {
		@Override
		public Object decode(Event event, String data) {
			...
		}
	    });

	    this.socket = client.create(this.options);
	    this.socket.on(Event.MESSAGE, new Function<ConnectionStateChangeEvent>() {
		@Override
		public void on(ConnectionStateChangeEvent response) {
		    LOGGER.info("Connection with uuid: {} was success to central server", response.getUuid());
		}
	    });
	    socket.on(Event.HEADERS, new Function<String>() {
		@Override
		public void on(String t) {
		    LOGGER.info("EVENT:HEADERS");
		}
	    });

	    socket.on(new Function<Throwable>() {
		@Override
		public void on(Throwable t) {
		    LOGGER.info("MESSAGE:THROWABLE");
		    LOGGER.error(t.getMessage(), t);
		}
	    });
	    socket.on(Event.REOPENED, new Function<String>() {
		@Override
		public void on(String t) {
		    LOGGER.info("EVENT:REOPENED");
		}
	    });
	    socket.on(Event.TRANSPORT, new Function<String>() {
		@Override
		public void on(String t) {
		    LOGGER.info("EVENT:TRANSPORT");
		}
	    });
	    socket.on(Event.STATUS, new Function<String>() {
		@Override
		public void on(String t) {
		    LOGGER.info("EVENT:STATUS");
		}
	    });
	    socket.on(Event.OPEN, new Function<String>() {
		@Override
		public void on(String t) {
		    LOGGER.info("EVENT:OPEN");
		}
	    });
	    socket.on(Event.CLOSE, new Function<String>() {
		@Override
		public void on(String t) {
		    LOGGER.info("EVENT:CLOSE");
		}
	    });
	    socket.on(Event.ERROR, new Function<String>() {
		@Override
		public void on(String s) {
		    LOGGER.info("EVENT:ERROR");
		}
	    });
	    socket.open(requestBuilder.build());
	} catch (Exception e) {
	    LOGGER.error("Cannot initialize online state provider: {}", e.getMessage(), e);
	}
    }
}

AsyncHttpClient config:

import com.ning.http.client.AsyncHttpClient;
import com.ning.http.client.AsyncHttpClientConfig;
import com.ning.http.client.Realm;

public class HttpClientFactory {
    public static final AsyncHttpClient createHttpClient(int connectionTimeOut, String userAgent) {
	AsyncHttpClientConfig.Builder ccBuilder = new AsyncHttpClientConfig.Builder();
	Realm realm = new Realm.RealmBuilder().setPrincipal("...").setPassword("...").setUsePreemptiveAuth(true).setScheme(Realm.AuthScheme.BASIC).build();
	ccBuilder.setRealm(realm).build();
	ccBuilder.setAcceptAnyCertificate(true);
	ccBuilder.setFollowRedirect(true);
	ccBuilder.setRequestTimeout(-1);
	ccBuilder.setConnectTimeout(connectionTimeOut);
	ccBuilder.setUserAgent(userAgent);
	ccBuilder.setMaxRequestRetry(0);
	return new AsyncHttpClient(ccBuilder.build());
    }
}

AsyncHttpClient version is maven dependency of wasync.

My problem is that if i turn off wifi, or pull out network connector in client machine, wasync cannot detect network loss, do nothing, just not receiving messages from server, even if network connection is coming back.

Is it a streaming related problem, or related to my configuration. How can i detect that network connection is not available? Maybe i should write a ping like scheduler, and detect socket io exception?

Any help would be life saving for me.

Cheers,
Zsolt

@jfarcand
Copy link
Member

jfarcand commented Apr 10, 2017

@zsenyeg Can you put a breakpoint

if (force && closed.getAndSet(true)) return;
and debug from there.

@zsenyeg
Copy link
Author

zsenyeg commented May 24, 2017

Hey!

Thanks for your suggestion. I'v tried to debugging the problem, the close method is not invoking in case of wifi connection loss.

Another strange - or expected - behaviour is when the central server is down automatic reconnection is not working. Here's the warning from the log:

2017-05-24 11:03:42 - [New I/O boss #9] - WARN  o.a.wasync.transport.StreamTransport - StreamTransport notified with exception java.net.ConnectException: Connection refused: no further information: was.example.com/127.0.0.1:9443 for request : https://was.example.com:9443/allexa-be-gj-expert/atmosphere/push
2017-05-24 11:03:42 - [New I/O boss #9] - WARN  o.a.wasync.transport.StreamTransport - 
java.net.ConnectException: Connection refused: no further information: was.example.com/127.0.0.1:9443
	at com.ning.http.client.providers.netty.request.NettyConnectListener.onFutureFailure(NettyConnectListener.java:131) [async-http-client-1.9.28.jar:na]
	at com.ning.http.client.providers.netty.request.NettyConnectListener.operationComplete(NettyConnectListener.java:143) [async-http-client-1.9.28.jar:na]
	at org.jboss.netty.channel.DefaultChannelFuture.notifyListener(DefaultChannelFuture.java:409) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.DefaultChannelFuture.notifyListeners(DefaultChannelFuture.java:400) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.DefaultChannelFuture.setFailure(DefaultChannelFuture.java:362) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:109) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.socket.nio.NioClientBoss.process(NioClientBoss.java:79) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.socket.nio.AbstractNioSelector.run(AbstractNioSelector.java:337) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.socket.nio.NioClientBoss.run(NioClientBoss.java:42) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.util.ThreadRenamingRunnable.run(ThreadRenamingRunnable.java:108) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.util.internal.DeadLockProofWorker$1.run(DeadLockProofWorker.java:42) [netty-3.10.3.Final.jar:na]
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102]
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102]
	at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102]
Caused by: java.net.ConnectException: Connection refused: no further information: was.example.com/127.0.0.1:9443
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_102]
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_102]
	at org.jboss.netty.channel.socket.nio.NioClientBoss.connect(NioClientBoss.java:152) [netty-3.10.3.Final.jar:na]
	at org.jboss.netty.channel.socket.nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105) [netty-3.10.3.Final.jar:na]
	... 8 common frames omitted
2017-05-24 11:03:42 - [New I/O boss #9] - INFO  h.d.a.b.b.OnlineStateProviderBean - MESSAGE:THROWABLE

Is it intended that after on throwable call there's no automatic reconnection?
When central server is up, and application is down the message in the log is the next, and automatic reconnection is working:

`2017-05-24 11:03:31 - [New I/O worker #5] - ERROR h.d.a.b.b.OnlineStateProviderBean - Unexpected message Error 404: com.ibm.ws.webcontainer.servlet.exception.NoTargetForURIException: No target servlet configured for uri: /allexa-be-gj-expert/atmosphere/push

What am i doing wrong?

Cheers,
Zsolt

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants