Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add static location and live location support #1423

Draft
wants to merge 6 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
106 changes: 106 additions & 0 deletions src/channel.ts
Original file line number Diff line number Diff line change
@@ -61,6 +61,7 @@
PartialUpdateMemberAPIResponse,
AIState,
MessageOptions,
Attachment,
} from './types';
import { Role } from './permissions';
import { DEFAULT_QUERY_CHANNEL_MESSAGE_LIST_PAGE_SIZE } from './constants';
@@ -471,6 +472,111 @@
return data;
}

public async sendStaticLocation(
attachmentMetadata: { latitude: number; longitude: number } & Attachment<StreamChatGenerics>,
) {
const { latitude, longitude } = attachmentMetadata;

const message: Message = {
attachments: [
{
...attachmentMetadata,
type: 'static_location',
latitude,
longitude,
},
],
};

return await this.sendMessage(message);
}

public async startLiveLocationSharing(
attachmentMetadata: { end_time: string; latitude: number; longitude: number } & Attachment<StreamChatGenerics>,
) {
const client = this.getClient();
if (!client.userID) return;

const { latitude, longitude, end_time } = attachmentMetadata;

const message: Message = {
attachments: [
{
...attachmentMetadata,
type: 'live_location',
latitude,
longitude,
end_time,
},
],
};

// FIXME: this is wrong and could easily be walked around by integrators
const existing = await this.getClient().search(
// @ts-ignore
{
cid: this.cid,
},
{
$and: [
{ 'attachments.type': { $eq: 'live_location' } },
// has not been manually stopped
{
'attachments.stopped_sharing': {
$nin: [true],
},
},
// has not ended
{
'attachments.end_time': {
$gt: new Date().toISOString(),
},
},
],
},
);

const promises: Promise<any>[] = [];

Check warning on line 539 in src/channel.ts

GitHub Actions / lint

Unexpected any. Specify a different type

for (const result of existing.results) {
const [attachment] = result.message.attachments ?? [];

promises.push(
client.partialUpdateMessage(result.message.id, {
// @ts-expect-error
set: {
attachments: [
{
...attachment,
stopped_sharing: true,
},
],
},
}),
);
}

// FIXME: sending message if the previous part failed/did not happen
// should result in BE error
promises.unshift(this.sendMessage(message));

const [response] = await Promise.allSettled(promises);

if (response.status === 'fulfilled') {
this.getClient().dispatchEvent({ message: response.value.message, type: 'live_location_sharing.started' });
}
}

public async stopLiveLocationSharing(message: MessageResponse<StreamChatGenerics>) {
const [attachment] = message.attachments ?? [];
const response = await this.getClient().partialUpdateMessage(message.id, {
// @ts-expect-error this is a valid update
set: { attachments: [{ ...attachment, stopped_sharing: true }] },
});

this.getClient().dispatchEvent({ message: response.message, type: 'live_location_sharing.stopped' });
}

/**
* delete - Delete the channel. Messages are permanently removed.
*
18 changes: 18 additions & 0 deletions src/client.ts
Original file line number Diff line number Diff line change
@@ -2552,6 +2552,24 @@ export class StreamChat<StreamChatGenerics extends ExtendableGenerics = DefaultG
return messageId;
}

public updateLiveLocation(
message: MessageResponse<StreamChatGenerics>,
{ latitude, longitude }: { latitude: number; longitude: number },
) {
const [attachment] = message.attachments ?? [];

if (!attachment || attachment.type !== 'live_location') {
throw new Error(
'Supplied message either has no attachments to update or attachment is not of type "live_location"',
);
}

return this.partialUpdateMessage(message.id, {
// @ts-expect-error valid update
set: { attachments: [{ ...attachment, latitude, longitude }] },
});
}

/**
* pinMessage - pins the message
* @param {string | { id: string }} messageOrMessageId message object or message id
117 changes: 117 additions & 0 deletions src/concurrency.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
interface PendingPromise {
onContinued: () => void;
promise: Promise<unknown>;
}

type AsyncWrapper<P extends unknown[], T> = (
tag: string | symbol,
cb: (...args: P) => Promise<T>,
) => {
cb: () => Promise<T>;
onContinued: () => void;
};

/**
* Runs async functions serially. Useful for wrapping async actions that
* should never run simultaneously: if marked with the same tag, functions
* will run one after another.
*
* @param tag Async functions with the same tag will run serially. Async functions
* with different tags can run in parallel.
* @param cb Async function to run.
* @returns Promise that resolves when async functions returns.
*/
export const withoutConcurrency = createRunner(wrapWithContinuationTracking);

/**
* Runs async functions serially, and cancels all other actions with the same tag
* when a new action is scheduled. Useful for wrapping async actions that override
* each other (e.g. enabling and disabling camera).
*
* If an async function hasn't started yet and was canceled, it will never run.
* If an async function is already running and was canceled, it will be notified
* via an abort signal passed as an argument.
*
* @param tag Async functions with the same tag will run serially and are canceled
* when a new action with the same tag is scheduled.
* @param cb Async function to run. Receives AbortSignal as the only argument.
* @returns Promise that resolves when async functions returns. If the function didn't
* start and was canceled, will resolve with 'canceled'. If the function started to run,
* it's up to the function to decide how to react to cancelation.
*/
export const withCancellation = createRunner(wrapWithCancellation);

const pendingPromises = new Map<string | symbol, PendingPromise>();

export function hasPending(tag: string | symbol) {
return pendingPromises.has(tag);
}

export async function settled(tag: string | symbol) {
await pendingPromises.get(tag)?.promise;
}

/**
* Implements common functionality of running async functions serially, by chaining
* their promises one after another.
*
* Before running, async function is "wrapped" using the provided wrapper. This wrapper
* can add additional steps to run before or after the function.
*
* When async function is scheduled to run, the previous function is notified
* by calling the associated onContinued callback. This behavior of this callback
* is defined by the wrapper.
*/
function createRunner<P extends unknown[], T>(wrapper: AsyncWrapper<P, T>) {
return function run(tag: string | symbol, cb: (...args: P) => Promise<T>) {
const { cb: wrapped, onContinued } = wrapper(tag, cb);
const pending = pendingPromises.get(tag);
pending?.onContinued();
const promise = pending ? pending.promise.then(wrapped, wrapped) : wrapped();
pendingPromises.set(tag, { promise, onContinued });
return promise;
};
}

/**
* Wraps an async function with an additional step run after the function:
* if the function is the last in the queue, it cleans up the whole chain
* of promises after finishing.
*/
function wrapWithContinuationTracking<T>(tag: string | symbol, cb: () => Promise<T>) {
let hasContinuation = false;
const wrapped = () =>
cb().finally(() => {
if (!hasContinuation) {
pendingPromises.delete(tag);
}
});
const onContinued = () => (hasContinuation = true);
return { cb: wrapped, onContinued };
}

/**
* Wraps an async function with additional functionalilty:
* 1. Associates an abort signal with every function, that is passed to it
* as an argument. When a new function is scheduled to run after the current
* one, current signal is aborted.
* 2. If current function didn't start and was aborted, in will never start.
* 3. If the function is the last in the queue, it cleans up the whole chain
* of promises after finishing.
*/
function wrapWithCancellation<T>(tag: string | symbol, cb: (signal: AbortSignal) => Promise<T | 'canceled'>) {
const ac = new AbortController();
const wrapped = () => {
if (ac.signal.aborted) {
return Promise.resolve('canceled' as const);
}

return cb(ac.signal).finally(() => {
if (!ac.signal.aborted) {
pendingPromises.delete(tag);
}
});
};
const onContinued = () => ac.abort();
return { cb: wrapped, onContinued };
}
2 changes: 2 additions & 0 deletions src/events.ts
Original file line number Diff line number Diff line change
@@ -59,4 +59,6 @@ export const EVENT_MAP = {
'connection.recovered': true,
'transport.changed': true,
'capabilities.changed': true,
'live_location_sharing.started': true,
'live_location_sharing.stopped': true,
};
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -18,4 +18,5 @@ export * from './thread';
export * from './thread_manager';
export * from './token_manager';
export * from './types';
export * from './live_location_manager';
export { isOwnUser, chatCodes, logChatPromiseExecution, formatMessage } from './utils';
Loading