Skip to content

Commit

Permalink
feat: support heartbeat for ws
Browse files Browse the repository at this point in the history
  • Loading branch information
czy88840616 committed Jan 20, 2024
1 parent 0f54741 commit fdcefc3
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 3 deletions.
5 changes: 4 additions & 1 deletion packages/ws/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@ import { Configuration } from '@midwayjs/core';
importConfigs: [
{
default: {
webSocket: {},
webSocket: {
enableServerHeartbeatCheck: false,
serverHeartbeatInterval: 30000,
},
},
},
],
Expand Down
29 changes: 28 additions & 1 deletion packages/ws/src/framework.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ export class MidwayWSFramework extends BaseFramework<
IMidwayWSConfigurationOptions
> {
server: http.Server;
protected heartBeatInterval: NodeJS.Timeout;
protected connectionMiddlewareManager = this.createMiddlewareManager();

configure(): IMidwayWSConfigurationOptions {
Expand Down Expand Up @@ -95,6 +96,9 @@ export class MidwayWSFramework extends BaseFramework<
this.logger.info(
`[midway:ws] WebSocket server port = ${this.configurationOptions.port} start success.`
);
if (this.configurationOptions.enableServerHeartbeatCheck) {
this.startHeartBeat();
}
resolve();
});
});
Expand Down Expand Up @@ -138,6 +142,13 @@ export class MidwayWSFramework extends BaseFramework<
this.app.on(
'connection',
async (socket: IMidwayWSContext, request: http.IncomingMessage) => {
socket.isAlive = true;
socket.on('error', error => {
this.logger.error(`socket got error: ${error}`);
});
socket.on('pong', () => {
socket.isAlive = true;
});
// create request context
this.app.createAnonymousContext(socket);
socket.requestContext.registerObject('socket', socket);
Expand Down Expand Up @@ -277,10 +288,13 @@ export class MidwayWSFramework extends BaseFramework<
);

this.app.on('error', err => {
this.logger.error('socket server close', err);
this.logger.error('socket server got error', err);
});

this.app.on('close', () => {
if (this.heartBeatInterval) {
clearInterval(this.heartBeatInterval);
}
this.logger.info('socket server close');
});
}
Expand Down Expand Up @@ -333,6 +347,19 @@ export class MidwayWSFramework extends BaseFramework<
> {
return this.connectionMiddlewareManager;
}

public startHeartBeat() {
this.heartBeatInterval = setInterval(() => {
this.app.clients.forEach((socket: IMidwayWSContext) => {
if (socket.isAlive === false) {
debug('[ws]: socket terminate');
return socket.terminate();
}
socket.isAlive = false;
socket.ping();
});
}, this.configurationOptions.serverHeartbeatInterval);
}
}

function formatResult(result) {
Expand Down
9 changes: 9 additions & 0 deletions packages/ws/src/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,19 @@ export type IMidwayWSApplication = IMidwayApplication<IMidwayWSContext, {
export type IMidwayWSConfigurationOptions = {
pubClient?: any;
subClient?: any;
/**
* enable server heartbeat check, default is false
*/
enableServerHeartbeatCheck?: boolean;
/**
* server heartbeat interval, default is 30000ms
*/
serverHeartbeatInterval?: number;
} & Partial<WebSocket.ServerOptions> & IConfigurationOptions;

export type IMidwayWSContext = IMidwayContext<WebSocket & {
app: IMidwayWSApplication;
isAlive: boolean;
}>;

export type Application = IMidwayWSApplication;
Expand Down
3 changes: 3 additions & 0 deletions packages/ws/test/fixtures/base-app-heartbeat/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"name": "ali-demo"
}
25 changes: 25 additions & 0 deletions packages/ws/test/fixtures/base-app-heartbeat/src/configuration.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
import { Configuration, App } from '@midwayjs/core';
import { ILifeCycle } from '@midwayjs/core';
import { Application } from '../../../../src';

@Configuration({
importConfigs: [
{
default: {
webSocket: {
port: 3000,
enableServerHeartbeatCheck: true,
serverHeartbeatInterval: 1000,
}
}
}
]
})
export class AutoConfiguration implements ILifeCycle {

@App()
app: Application;

async onReady() {
}
}
10 changes: 10 additions & 0 deletions packages/ws/test/fixtures/base-app-heartbeat/src/service/user.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { Provide } from '@midwayjs/core';

@Provide()
export class UserService {
async hello(name) {
return {
name,
};
}
}
38 changes: 38 additions & 0 deletions packages/ws/test/fixtures/base-app-heartbeat/src/socket/api.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import {
Inject,
OnWSConnection,
OnWSDisConnection,
OnWSMessage,
Provide,
WSController,
} from '@midwayjs/core';
import { UserService } from '../service/user';
import { IMidwayWSContext } from '../../../../../src';
import * as assert from 'assert';

@Provide()
@WSController()
export class APIController {
@Inject()
ctx: IMidwayWSContext;

@Inject()
userService: UserService;

@OnWSConnection()
init(socket, request) {
console.log(`namespace / got a connection ${this.ctx.readyState}`);
assert(this.ctx.readyState === socket.readyState);
assert(request);
}

@OnWSMessage('message')
async gotMyMessage(data) {
return { name: 'harry', result: parseInt(data) + 5 };
}

@OnWSDisConnection()
disconnect(id: number) {
console.log('disconnect ' + id);
}
}
4 changes: 3 additions & 1 deletion packages/ws/test/fixtures/base-app/src/configuration.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,9 @@ import { Application } from '../../../../src';
{
default: {
webSocket: {
port: 3000
port: 3000,
enableServerHeartbeatCheck: true,
serverHeartbeatInterval: 1000,
}
}
}
Expand Down
33 changes: 33 additions & 0 deletions packages/ws/test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ describe('/test/index.test.ts', () => {
const app = await createServer('base-app');
const client = await createWebSocketClient(`ws://localhost:3000`);

client.on('ping', () => {
console.log('got ping');
});

client.send(1);
let gotEvent = once(client, 'message');
let [data] = await gotEvent;
Expand Down Expand Up @@ -76,4 +80,33 @@ describe('/test/index.test.ts', () => {
await client.close();
await closeApp(app);
});

it('should test heartbeat timeout and terminate', async () => {
const app = await createServer('base-app-heartbeat');
const client = await createWebSocketClient(`ws://localhost:3000`);

client.on('ping', () => {
console.log('got ping');
});

client.send(1);
let gotEvent = once(client, 'message');
let [data] = await gotEvent;
expect(JSON.parse(data)).toEqual({
name: 'harry',
result: 6,
});

await sleep(2000);

// 客户端终止后,服务端会收到 disconnect 事件
client.terminate();

await sleep(2000);

// 看一下服务端的 clients
expect(app.clients.size).toEqual(0);

await closeApp(app);
});
});
50 changes: 50 additions & 0 deletions site/docs/extensions/ws.md
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,56 @@ export class HomeController {



## 心跳检查

有时服务器和客户端之间的连接可能会中断,服务器和客户端都不知道连接的断开情况。

可以通过启用 `enableServerHeartbeatCheck` 配置心跳检查主动断开请求。

```typescript
// src/config/config.default
export default {
// ...
webSocket: {
enableServerHeartbeatCheck: true,
},
}
```

默认检查时间为 `30*1000` 毫秒,可以通过 `serverHeartbeatInterval` 进行修改,配置单位为毫秒。

```typescript
// src/config/config.default
export default {
// ...
webSocket: {
serverHeartbeatInterval: 30000,
},
}
```

这一配置每隔一段时间会自动发送 `ping` 包,客户端若没有在下一个时间间隔返回消息,则会被自动 `terminate`

客户端如果希望知道服务端的状态,可以通过监听 `ping` 消息来实现。

```typescript
import WebSocket from 'ws';

function heartbeat() {
clearTimeout(this.pingTimeout);

// 每次接收 ping 之后,延迟等待,如果下一次未拿到服务端 ping 消息,则认为出现问题
this.pingTimeout = setTimeout(() => {
// 重连或者中止
}, 30000 + 1000);
}

const client = new WebSocket('wss://websocket-echo.com/');

// ...
client.on('ping', heartbeat);
```



## 本地测试
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,56 @@ export class HomeController {



## Heartbeat check

Sometimes the connection between the server and the client may be interrupted, and neither the server nor the client is aware of the disconnection.

Heartbeat check proactive disconnect requests can be configured by enabling `enableServerHeartbeatCheck`.

```typescript
// src/config/config.default
export default {
// ...
webSocket: {
enableServerHeartbeatCheck: true,
},
}
```

The default check time is `30*1000` milliseconds, which can be modified through `serverHeartbeatInterval`, and the configuration unit is milliseconds.

```typescript
// src/config/config.default
export default {
// ...
webSocket: {
serverHeartbeatInterval: 30000,
},
}
```

This configuration will automatically send `ping` packets at regular intervals. If the client does not return a message in the next time interval, it will be automatically `terminate`.

If the client wants to know the status of the server, it can do so by listening to the `ping` message.

```typescript
import WebSocket from 'ws';

function heartbeat() {
clearTimeout(this.pingTimeout);

// After each ping is received, delay and wait. If the server ping message is not received next time, it is considered that there is a problem.
this.pingTimeout = setTimeout(() => {
//Reconnect or abort
}, 30000 + 1000);
}

const client = new WebSocket('wss://websocket-echo.com/');

// ...
client.on('ping', heartbeat);
```



## Local test
Expand Down

0 comments on commit fdcefc3

Please sign in to comment.