Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions examples/bot.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import {
AutojoinRoomsMixin,
LogLevel,
LogService,
MessageEvent,
RichConsoleLogger,
SimpleFsStorageProvider
} from "../src";
import { SyncV3MatrixClient } from "../src/syncv3/SyncV3MatrixClient";

LogService.setLogger(new RichConsoleLogger());
LogService.setLevel(LogLevel.TRACE);
LogService.muteModule("Metrics");
LogService.trace = LogService.debug;

let creds = null;
try {
creds = require("../../examples/storage/bot.creds.json");
} catch (e) {
// ignore
}

const homeserverUrl = creds?.['homeserverUrl'] ?? "http://localhost:8008";
const accessToken = creds?.['accessToken'] ?? 'YOUR_TOKEN';
const storage = new SimpleFsStorageProvider("./examples/storage/bot.json");

const client = new SyncV3MatrixClient(homeserverUrl, accessToken, storage);
AutojoinRoomsMixin.setupOnClient(client);

(async function() {
// client.joinRoom('!dIJnfuNqTABKFEGJVf:localhost');
client.on("room.message", async (roomId: string, event: any) => {
const message = new MessageEvent(event);
if (message.sender === await client.getUserId() || message.messageType === "m.notice") return;

if (message.textBody.startsWith("!hello ")) {
await client.replyText(roomId, event, message.textBody.substring("!hello ".length).trim() || "Hello!");
}
});

await client.start();
LogService.info("index", "Client started! Running as " + (await client.getUserId()));
})();
1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
"lint": "eslint 'src/**/*.ts'",
"test": "ts-mocha --project ./tsconfig.json test/*Test.ts test/**/*.ts",
"build:examples": "tsc -p tsconfig-examples.json",
"example:bot": "yarn build:examples && node lib/examples/bot.js",
"example:appservice": "yarn build:examples && node lib/examples/appservice.js",
"example:login_register": "yarn build:examples && node lib/examples/login_register.js",
"example:encryption_bot": "yarn build:examples && node lib/examples/encryption_bot.js",
Expand Down
2 changes: 1 addition & 1 deletion src/MatrixClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ export class MatrixClient extends EventEmitter {
private joinStrategy: IJoinRoomStrategy = null;
private eventProcessors: { [eventType: string]: IPreprocessor[] } = {};
private filterId = 0;
private stopSyncing = false;
protected stopSyncing = false;
private metricsInstance: Metrics = new Metrics();
private unstableApisInstance = new UnstableApis(this);

Expand Down
104 changes: 104 additions & 0 deletions src/syncv3/SyncV3MatrixClient.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
import { MatrixClient } from "../MatrixClient";
import { extractRequestError, LogService } from "../logging/LogService";
import { IStorageProvider } from "../storage/IStorageProvider";
import { SyncV3Response } from "./models";
import { IV3List, ListBehaviour, SortBehaviour, V3List } from "./V3List";

/**
* A MatrixClient class which attempts to use Sync V3 instead of the normal sync protocol
* on the server.
*
* This class is considered <b>UNSTABLE</b> and may be removed at any time. This class will
* not check for server support before attempting to use it.
*
* @category Unstable: Sync V3
*/
export class SyncV3MatrixClient extends MatrixClient {
private lastPos: string;
private lists: V3List[] = [];

/**
* Creates a new matrix client
* @param {string} homeserverUrl The homeserver's client-server API URL
* @param {string} accessToken The access token for the homeserver
* @param {IStorageProvider} storage The storage provider to use.
* @param {SortBehaviour[]} sortBehaviours The list sorting behaviour to use.
*/
public constructor(homeserverUrl: string, accessToken: string, storage: IStorageProvider, sortBehaviours: SortBehaviour[] = [SortBehaviour.Name]) {
super(homeserverUrl, accessToken, storage);

this.lists = [
new V3List(ListBehaviour.JoinedOnly, sortBehaviours),
new V3List(ListBehaviour.InvitedOnly, sortBehaviours),
new V3List(ListBehaviour.DMsOnly, sortBehaviours),
];
}

public get joinedList(): IV3List {
return this.lists[0];
}

public get invitedList(): IV3List {
return this.lists[1];
}

public get directMessagesList(): IV3List {
return this.lists[2];
}

protected async startSyncInternal(): Promise<any> {
this.lastPos = await this.storageProvider.getSyncToken();
for (let i = 0; i < this.lists.length; i++) {
const value = JSON.parse((await this.storageProvider.readValue(`sync_v3_list_${i}`)) || "{}");
await this.lists[i].processOperations(this, value.count ?? 0, value.ops ?? []);
}
this.loopSync();
}

private loopSync() {
const promiseWhile = async () => {
if (this.stopSyncing) {
LogService.info("MatrixClientLite", "Client stop requested - stopping sync");
return;
}

try {
const response = await this.doSyncV3();
await this.processSyncV3(response);
this.lastPos = response.pos;
await this.storageProvider.setSyncToken(this.lastPos);
} catch (e) {
LogService.error("MatrixClientLite", "Error handling sync " + extractRequestError(e));
const backoffTime = 5000 + Math.random() * (15000 - 5000); // TODO: de-hardcode values. SYNC_BACKOFF_MIN_MS SYNC_BACKOFF_MAX_MS
LogService.info("MatrixClientLite", `Backing off for ${backoffTime}ms`);
await new Promise((r) => setTimeout(r, backoffTime));
}

return promiseWhile();
};

promiseWhile(); // start loop async
}

private async processSyncV3(sync: SyncV3Response): Promise<void> {
for (let i = 0; i < this.lists.length; i++) {
const ops = sync.ops.filter(o => o.list === i);
await this.lists[i].processOperations(this, sync.counts[i], ops);
await this.storageProvider.storeValue(`sync_v3_list_${i}`, JSON.stringify({
ops: this.lists[i].lossySerialized,
count: this.lists[i].totalRoomCount,
}));
}
}

private async doSyncV3(): Promise<SyncV3Response> {
const userId = await this.getUserId();
const qs = {};
if (this.lastPos) qs['pos'] = this.lastPos;
return this.doRequest("POST", "/_matrix/client/unstable/org.matrix.msc3575/sync", qs, {
lists: this.lists.map(l => l.getDefinitionFor(userId)),

// TODO: Support extensions for crypto and such
});
}
}
179 changes: 179 additions & 0 deletions src/syncv3/V3List.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
import { IV3Room, V3Room } from "./V3Room";
import { Operation, OpSync, SyncV3Operation } from "./operations";
import { SyncV3List, SyncV3OperationRoom, SyncV3Room } from "./models";
import { MatrixClient } from "../MatrixClient";

/**
* @category Unstable: Sync V3
*/
export enum ListBehaviour {
JoinedOnly,
InvitedOnly,
DMsOnly,
}

/**
* @category Unstable: Sync V3
*/
export enum SortBehaviour {
NotificationCount = "by_notification_count",
Recency = "by_recency",
Name = "by_name",
}

/**
* @category Unstable: Sync V3
*/
export const CURRENT_USER_STATE_KEY = "@CURRENT";

/**
* @category Unstable: Sync V3
*/
export type RequiredStateTuple = [string, string];

// Unclear if it's even worth changing this to be smaller/bigger, but let's start here.
const WINDOW_SIZE = 1000;

const TIMELINE_LIMIT = 20; // make configurable?

/**
* @category Unstable: Sync V3
*/
export interface IV3List {
totalRoomCount: number;
orderedKnownRooms: IV3Room[];
unorderedAllRooms: IV3Room[];
}

/**
* @category Unstable: Sync V3
*/
export class V3List implements IV3List {
private roomsByRange = new Map<string, V3Room[]>(); // key is JSON-serialized range
private roomMap = new Map<string, V3Room>(); // key is room ID
private totalCount = 0;

public constructor(
public readonly behaviour: ListBehaviour,
public readonly sort: SortBehaviour[],
) {
}

public get lossySerialized(): Omit<OpSync, "list">[] {
return Array.from(this.roomsByRange.entries()).map(([serializedRange, rooms]) => ({
op: Operation.Sync,
range: JSON.parse(serializedRange),
rooms: rooms.filter(r => !!r).map(r => r.lossySerialized),
}));
}

public get totalRoomCount(): number {
return this.totalCount;
}

public get orderedKnownRooms(): V3Room[] {
// "Just because you can chain functions, doesn't mean you should" ~ CS Instructor
return Array.from(this.roomsByRange.entries())
.map(([serializedRange, rooms]) => [JSON.parse(serializedRange)[0], rooms] as [number, V3Room[]])
.sort((a, b) => a[0] - b[0])
.map(e => e[1])
.reduce((p, c) => [...p, ...c], []);
}

public get unorderedAllRooms(): V3Room[] {
return Array.from(this.roomMap.values());
}

private findRange(index: number): [number, number] {
return [
Math.floor(index / WINDOW_SIZE) * WINDOW_SIZE,
Math.floor((index + WINDOW_SIZE) / WINDOW_SIZE) * WINDOW_SIZE,
];
}

private findIndexInRange(sourceIndex: number, range: [number, number]): number {
return sourceIndex - range[0];
}

public getDefinitionFor(currentUserId: string): SyncV3List {
const ranges: [number, number][] = [];
for (let i = 0; i <= Math.ceil(this.totalCount / WINDOW_SIZE); i++) {
ranges.push([i * WINDOW_SIZE, (i + 1) * WINDOW_SIZE]);
}
return {
filters: {
is_dm: this.behaviour === ListBehaviour.DMsOnly,
is_invite: this.behaviour === ListBehaviour.InvitedOnly,
},
sort: this.sort,
required_state: V3Room.REQUIRED_STATE.map(t => [t[0], t[1] === CURRENT_USER_STATE_KEY ? currentUserId : t[1]]),
timeline_limit: TIMELINE_LIMIT,
rooms: ranges,
};
}

private async getOrCreateRoom(client: MatrixClient, room: SyncV3OperationRoom): Promise<V3Room> {
if (!this.roomMap.get(room.room_id)) {
this.roomMap.set(room.room_id, new V3Room(room.room_id));
}
const mapped = this.roomMap.get(room.room_id);
if (room.required_state) await mapped.updateState(client, room.required_state);
if (room.timeline) await mapped.updateTimeline(client, room.timeline);
return mapped;
}

private getOrCreateRangeFromIndex(index: number): {range: [number, number], inRangeIdx: number, rooms: V3Room[]} {
const range = this.findRange(index);
const inRangeIdx = this.findIndexInRange(index, range);
let rooms = this.roomsByRange.get(JSON.stringify(range));
if (!rooms) {
rooms = new Array<V3Room>(range[1] - range[0]).fill(null);
this.roomsByRange.set(JSON.stringify(range), rooms);
}
return {range, inRangeIdx, rooms};
}

public async processOperations(client: MatrixClient, totalCount: number, ops: SyncV3Operation[]): Promise<void> {
if (totalCount) this.totalCount = totalCount;
for (const op of ops) {
switch(op.op) {
case Operation.Delete: {
const range = this.findRange(op.index);
const inRangeIdx = this.findIndexInRange(op.index, range);
const rooms = this.roomsByRange.get(JSON.stringify(range));
if (rooms) {
const [deletedRoom] = rooms.splice(inRangeIdx, 1);
this.roomMap.delete(deletedRoom.roomId);
}
break;
}
case Operation.Insert: {
const info = this.getOrCreateRangeFromIndex(op.index);
info.rooms[info.inRangeIdx] = await this.getOrCreateRoom(client, op.room);
break;
}
case Operation.Update: {
const info = this.getOrCreateRangeFromIndex(op.index);
const room = info.rooms[info.inRangeIdx];
if (!room) throw new Error("Failed to handle update operation: unknown room at index. Op: " + JSON.stringify(op));
if (op.room.required_state) await room.updateState(client, op.room.required_state);
if (op.room.timeline) await room.updateTimeline(client, op.room.timeline);
break;
}
case Operation.Sync: {
const rooms = new Array<V3Room>(op.range[1] - op.range[0]).fill(null)
for (let i = 0; i < op.rooms.length; i++) {
if (!op.rooms[i]) continue;
rooms[i] = await this.getOrCreateRoom(client, op.rooms[i]);
}
this.roomsByRange.set(JSON.stringify(op.range), rooms);
break;
}
case Operation.Invalidate: {
this.roomsByRange.delete(JSON.stringify(op.range));
break;
}
}
}
}
}
Loading