Skip to content

Commit ee84218

Browse files
Merge branch 'tulir:main' into feature/membership-request-events
2 parents 2e2ca36 + 4dbbef8 commit ee84218

File tree

21 files changed

+889
-358
lines changed

21 files changed

+889
-358
lines changed

appstate.go

Lines changed: 158 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright (c) 2022 Tulir Asokan
1+
// Copyright (c) 2026 Tulir Asokan
22
//
33
// This Source Code Form is subject to the terms of the Mozilla Public
44
// License, v. 2.0. If a copy of the MPL was not distributed with this
@@ -14,6 +14,8 @@ import (
1414
"time"
1515

1616
"github.com/rs/zerolog"
17+
"go.mau.fi/util/exslices"
18+
"go.mau.fi/util/ptr"
1719

1820
"go.mau.fi/whatsmeow/appstate"
1921
waBinary "go.mau.fi/whatsmeow/binary"
@@ -70,25 +72,83 @@ func (cli *Client) fetchAppState(ctx context.Context, name appstate.WAPatchName,
7072
}
7173
for hasMore {
7274
patches, err := cli.fetchAppStatePatches(ctx, name, state.Version, wantSnapshot)
73-
wantSnapshot = false
7475
if err != nil {
7576
return nil, fmt.Errorf("failed to fetch app state %s patches: %w", name, err)
77+
} else if !wantSnapshot && patches.Snapshot != nil {
78+
return nil, fmt.Errorf("server unexpectedly returned snapshot for %s without asking", name)
79+
} else if patches.Snapshot != nil && state != (appstate.HashState{}) {
80+
return nil, fmt.Errorf("unexpected non-empty input state (v%d) for %s when applying snapshot", state.Version, name)
7681
}
82+
wantSnapshot = false
7783
hasMore = patches.HasMorePatches
7884
state, err = cli.applyAppStatePatches(ctx, name, state, patches, fullSync, eventsToDispatchPtr)
7985
if err != nil {
86+
cli.dispatchEvent(&events.AppStateSyncError{Name: name, FullSync: fullSync, Error: err})
8087
return nil, err
8188
}
8289
}
8390
if fullSync {
8491
cli.Log.Debugf("Full sync of app state %s completed. Current version: %d", name, state.Version)
85-
eventsToDispatch = append(eventsToDispatch, &events.AppStateSyncComplete{Name: name})
92+
eventsToDispatch = append(eventsToDispatch, &events.AppStateSyncComplete{Name: name, Version: state.Version})
8693
} else {
8794
cli.Log.Debugf("Synced app state %s from version %d to %d", name, version, state.Version)
8895
}
8996
return eventsToDispatch, nil
9097
}
9198

99+
func (cli *Client) handleAppStateRecovery(
100+
ctx context.Context,
101+
reqID types.MessageID,
102+
result []*waE2E.PeerDataOperationRequestResponseMessage_PeerDataOperationResult,
103+
) bool {
104+
if len(result) == 0 || result[0].GetSyncdSnapshotFatalRecoveryResponse() == nil {
105+
cli.Log.Warnf("No app state recovery data received for %s", reqID)
106+
return true
107+
} else if len(result) > 1 {
108+
cli.Log.Warnf("Unexpected number of app state recovery results for %s: %d", reqID, len(result))
109+
}
110+
var eventsToDispatch []any
111+
eventsToDispatchPtr := &eventsToDispatch
112+
if !cli.EmitAppStateEventsOnFullSync {
113+
eventsToDispatchPtr = nil
114+
}
115+
snapshot, err := appstate.ParseRecovery(result[0].GetSyncdSnapshotFatalRecoveryResponse())
116+
if err != nil {
117+
cli.Log.Warnf("Failed to parse app state recovery blob for %s: %v", reqID, err)
118+
return true
119+
}
120+
name := appstate.WAPatchName(snapshot.GetCollectionName())
121+
version := snapshot.GetVersion().GetVersion()
122+
currentVersion, _, err := cli.Store.AppState.GetAppStateVersion(ctx, string(name))
123+
if err != nil {
124+
cli.Log.Errorf("Failed to get current app state %s version for %s: %v", name, reqID, err)
125+
return true
126+
} else if currentVersion >= version {
127+
cli.Log.Infof("Ignoring app state recovery response for %s as current version %d is newer than or equal to recovery version %d", reqID, currentVersion, snapshot.GetVersion().GetVersion())
128+
return true
129+
}
130+
cli.Log.Debugf("Handling app state recovery response for %s", reqID)
131+
mutations, err := cli.appStateProc.ProcessRecovery(ctx, snapshot)
132+
if err != nil {
133+
cli.Log.Warnf("Failed to parse app state recovery blob for %s: %v", reqID, err)
134+
return true
135+
}
136+
err = cli.collectEventsToDispatch(ctx, name, mutations, true, eventsToDispatchPtr)
137+
if err != nil {
138+
cli.Log.Warnf("Failed to collect app state events for %s: %v", reqID, err)
139+
return true
140+
}
141+
eventsToDispatch = append(eventsToDispatch, &events.AppStateSyncComplete{Name: name, Version: version, Recovery: true})
142+
for _, evt := range eventsToDispatch {
143+
handlerFailed := cli.dispatchEvent(evt)
144+
if handlerFailed {
145+
return false
146+
}
147+
}
148+
cli.Log.Debugf("Finished handling app state recovery response for %s (%s to v%d)", reqID, name, version)
149+
return true
150+
}
151+
92152
func (cli *Client) applyAppStatePatches(
93153
ctx context.Context,
94154
name appstate.WAPatchName,
@@ -104,28 +164,36 @@ func (cli *Client) applyAppStatePatches(
104164
}
105165
return state, fmt.Errorf("failed to decode app state %s patches: %w", name, err)
106166
}
107-
wasFullSync := state.Version == 0 && patches.Snapshot != nil
108-
state = newState
109-
if name == appstate.WAPatchCriticalUnblockLow && wasFullSync && !cli.EmitAppStateEventsOnFullSync {
167+
return newState, cli.collectEventsToDispatch(ctx, name, mutations, fullSync, eventsToDispatch)
168+
}
169+
170+
func (cli *Client) collectEventsToDispatch(
171+
ctx context.Context,
172+
name appstate.WAPatchName,
173+
mutations []appstate.Mutation,
174+
fullSync bool,
175+
eventsToDispatch *[]any,
176+
) error {
177+
if name == appstate.WAPatchCriticalUnblockLow && fullSync && !cli.EmitAppStateEventsOnFullSync {
110178
var contacts []store.ContactEntry
111179
mutations, contacts = cli.filterContacts(mutations)
112180
cli.Log.Debugf("Mass inserting app state snapshot with %d contacts into the store", len(contacts))
113-
err = cli.Store.Contacts.PutAllContactNames(ctx, contacts)
181+
err := cli.Store.Contacts.PutAllContactNames(ctx, contacts)
114182
if err != nil {
115183
// This is a fairly serious failure, so just abort the whole thing
116-
return state, fmt.Errorf("failed to update contact store with data from snapshot: %v", err)
184+
return fmt.Errorf("failed to update contact store with data from snapshot: %v", err)
117185
}
118186
}
119187
for _, mutation := range mutations {
120188
if eventsToDispatch != nil && mutation.Operation == waServerSync.SyncdMutation_SET {
121189
*eventsToDispatch = append(*eventsToDispatch, &events.AppState{Index: mutation.Index, SyncActionValue: mutation.Action})
122190
}
123-
evt := cli.dispatchAppState(ctx, mutation, fullSync)
191+
evt := cli.dispatchAppState(ctx, name, mutation, fullSync)
124192
if eventsToDispatch != nil && evt != nil {
125193
*eventsToDispatch = append(*eventsToDispatch, evt)
126194
}
127195
}
128-
return state, nil
196+
return nil
129197
}
130198

131199
func (cli *Client) filterContacts(mutations []appstate.Mutation) ([]appstate.Mutation, []store.ContactEntry) {
@@ -147,8 +215,24 @@ func (cli *Client) filterContacts(mutations []appstate.Mutation) ([]appstate.Mut
147215
return filteredMutations, contacts
148216
}
149217

150-
func (cli *Client) dispatchAppState(ctx context.Context, mutation appstate.Mutation, fullSync bool) (eventToDispatch any) {
151-
zerolog.Ctx(ctx).Trace().Any("mutation", mutation).Msg("Dispatching app state mutation")
218+
func (cli *Client) dispatchAppState(ctx context.Context, name appstate.WAPatchName, mutation appstate.Mutation, fullSync bool) (eventToDispatch any) {
219+
logLevel := zerolog.TraceLevel
220+
log := zerolog.Ctx(ctx)
221+
if cli.AppStateDebugLogs && log.GetLevel() != zerolog.TraceLevel {
222+
logLevel = zerolog.DebugLevel
223+
}
224+
logEvt := log.WithLevel(logLevel).
225+
Str("patch_name", string(name)).
226+
Uint64("patch_version", mutation.PatchVersion).
227+
Stringer("operation", mutation.Operation).
228+
Int32("version", mutation.Version).
229+
Strs("index", mutation.Index).
230+
Hex("index_mac", mutation.IndexMAC).
231+
Hex("value_mac", mutation.ValueMAC)
232+
if logLevel == zerolog.TraceLevel {
233+
logEvt.Any("action", mutation.Action)
234+
}
235+
logEvt.Msg("Received app state mutation")
152236

153237
if mutation.Operation != waServerSync.SyncdMutation_SET {
154238
return
@@ -196,10 +280,31 @@ func (cli *Client) dispatchAppState(ctx context.Context, mutation appstate.Mutat
196280
}
197281
case appstate.IndexClearChat:
198282
act := mutation.Action.GetClearChatAction()
199-
eventToDispatch = &events.ClearChat{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
283+
var deleteMedia bool
284+
// TODO what's index 2 here?
285+
if len(mutation.Index) > 3 && mutation.Index[3] == "1" {
286+
deleteMedia = true
287+
}
288+
eventToDispatch = &events.ClearChat{
289+
JID: jid,
290+
Timestamp: ts,
291+
Action: act,
292+
DeleteMedia: deleteMedia,
293+
FromFullSync: fullSync,
294+
}
200295
case appstate.IndexDeleteChat:
201296
act := mutation.Action.GetDeleteChatAction()
202-
eventToDispatch = &events.DeleteChat{JID: jid, Timestamp: ts, Action: act, FromFullSync: fullSync}
297+
var deleteMedia bool
298+
if len(mutation.Index) > 2 && mutation.Index[2] == "1" {
299+
deleteMedia = true
300+
}
301+
eventToDispatch = &events.DeleteChat{
302+
JID: jid,
303+
Timestamp: ts,
304+
Action: act,
305+
DeleteMedia: deleteMedia,
306+
FromFullSync: fullSync,
307+
}
203308
case appstate.IndexStar:
204309
if len(mutation.Index) < 5 {
205310
return
@@ -371,12 +476,11 @@ func (cli *Client) requestAppStateKeys(ctx context.Context, rawKeyIDs [][]byte)
371476
},
372477
},
373478
}
374-
ownID := cli.getOwnID().ToNonAD()
375-
if ownID.IsEmpty() || len(debugKeyIDs) == 0 {
479+
if len(debugKeyIDs) == 0 {
376480
return
377481
}
378482
cli.Log.Infof("Sending key request for app state keys %+v", debugKeyIDs)
379-
_, err := cli.SendMessage(ctx, ownID, msg, SendRequestExtra{Peer: true})
483+
_, err := cli.SendPeerMessage(ctx, msg)
380484
if err != nil {
381485
cli.Log.Warnf("Failed to send app state key request: %v", err)
382486
}
@@ -499,3 +603,40 @@ func (cli *Client) MarkNotDirty(ctx context.Context, cleanType string, ts time.T
499603
})
500604
return err
501605
}
606+
607+
// BuildFatalAppStateExceptionNotification builds a message to request the user's primary device
608+
// to reset specific app state collections. This will cause all linked devices to be logged out.
609+
//
610+
// The built message can be sent using Client.SendPeerMessage.
611+
// There is no response, as the client will get logged out.
612+
func BuildFatalAppStateExceptionNotification(collections ...appstate.WAPatchName) *waE2E.Message {
613+
return &waE2E.Message{
614+
ProtocolMessage: &waE2E.ProtocolMessage{
615+
Type: waE2E.ProtocolMessage_APP_STATE_FATAL_EXCEPTION_NOTIFICATION.Enum(),
616+
AppStateFatalExceptionNotification: &waE2E.AppStateFatalExceptionNotification{
617+
CollectionNames: exslices.CastToString[string](collections),
618+
Timestamp: ptr.Ptr(time.Now().UnixMilli()),
619+
},
620+
},
621+
}
622+
}
623+
624+
// BuildAppStateRecoveryRequest builds a message to request the user's primary device to send
625+
// an unencrypted copy of the given app state collection.
626+
//
627+
// The built message can be sent using Client.SendPeerMessage.
628+
// The response will come as a ProtocolMessage with type `PEER_DATA_OPERATION_RESPONSE_MESSAGE`.
629+
func BuildAppStateRecoveryRequest(collection appstate.WAPatchName) *waE2E.Message {
630+
return &waE2E.Message{
631+
ProtocolMessage: &waE2E.ProtocolMessage{
632+
Type: waE2E.ProtocolMessage_PEER_DATA_OPERATION_REQUEST_MESSAGE.Enum(),
633+
PeerDataOperationRequestMessage: &waE2E.PeerDataOperationRequestMessage{
634+
PeerDataOperationRequestType: waE2E.PeerDataOperationRequestType_COMPANION_SYNCD_SNAPSHOT_FATAL_RECOVERY.Enum(),
635+
SyncdCollectionFatalRecoveryRequest: &waE2E.PeerDataOperationRequestMessage_SyncDCollectionFatalRecoveryRequest{
636+
CollectionName: (*string)(&collection),
637+
Timestamp: ptr.Ptr(time.Now().Unix()),
638+
},
639+
},
640+
},
641+
}
642+
}

0 commit comments

Comments
 (0)