-
Notifications
You must be signed in to change notification settings - Fork 63
/
Copy pathframed_encrypt_stream.ts
249 lines (220 loc) · 7.49 KB
/
framed_encrypt_stream.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
// Copyright Amazon.com Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
serializeFactory,
aadFactory,
MessageHeader,
Maximum,
} from '@aws-crypto/serialize'
// @ts-ignore
import { Transform as PortableTransform } from 'readable-stream'
import { Transform } from 'stream'
import {
GetCipher,
AwsEsdkJsCipherGCM,
needs,
NodeAlgorithmSuite,
} from '@aws-crypto/material-management-node'
const fromUtf8 = (input: string) => Buffer.from(input, 'utf8')
const serialize = serializeFactory(fromUtf8, { utf8Sorting: true })
const { finalFrameHeader, frameHeader } = serialize
const aadUtility = aadFactory(fromUtf8)
interface AccumulatingFrame {
contentLength: number
content: Buffer[]
sequenceNumber: number
}
interface EncryptFrame {
content: Buffer[]
bodyHeader: Buffer
headerSent?: boolean
cipher: AwsEsdkJsCipherGCM
isFinalFrame: boolean
}
const PortableTransformWithType = PortableTransform as new (
...args: any[]
) => Transform
const ioTick = async () => new Promise((resolve) => setImmediate(resolve))
const noop = () => {} // eslint-disable-line @typescript-eslint/no-empty-function
type ErrBack = (err?: Error) => void
export function getFramedEncryptStream(
getCipher: GetCipher,
messageHeader: MessageHeader,
dispose: () => void,
{
plaintextLength,
suite,
}: { plaintextLength?: number; suite: NodeAlgorithmSuite }
) {
let accumulatingFrame: AccumulatingFrame = {
contentLength: 0,
content: [],
sequenceNumber: 1,
}
let pathologicalDrain: (reason?: any) => void = noop
const { frameLength } = messageHeader
/* Precondition: plaintextLength must be within bounds.
* The Maximum.BYTES_PER_MESSAGE is set to be within Number.MAX_SAFE_INTEGER
* See serialize/identifiers.ts enum Maximum for more details.
*/
needs(
!plaintextLength ||
(plaintextLength >= 0 && Maximum.BYTES_PER_MESSAGE >= plaintextLength),
'plaintextLength out of bounds.'
)
/* Keeping the messageHeader, accumulatingFrame and pathologicalDrain private is the intention here.
* It is already unlikely that these values could be touched in the current composition of streams,
* but a different composition may change this.
* Since we are handling the plain text here, it seems prudent to take extra measures.
*/
return new (class FramedEncryptStream extends PortableTransformWithType {
_transform(chunk: Buffer, encoding: string, callback: ErrBack) {
const contentLeft = frameLength - accumulatingFrame.contentLength
/* Precondition: Must not process more than plaintextLength.
* The plaintextLength is the MAXIMUM value that can be encrypted.
*/
needs(
!plaintextLength || (plaintextLength -= chunk.length) >= 0,
'Encrypted data exceeded plaintextLength.'
)
/* Check for early return (Postcondition): Have not accumulated a frame. */
if (contentLeft > chunk.length) {
// eat more
accumulatingFrame.contentLength += chunk.length
accumulatingFrame.content.push(chunk)
return callback()
}
accumulatingFrame.contentLength += contentLeft
accumulatingFrame.content.push(chunk.slice(0, contentLeft))
// grab the tail
const tail = chunk.slice(contentLeft)
const encryptFrame = getEncryptFrame({
pendingFrame: accumulatingFrame,
messageHeader,
getCipher,
isFinalFrame: false,
suite,
})
// Reset frame state for next frame
const { sequenceNumber } = accumulatingFrame
accumulatingFrame = {
contentLength: 0,
content: [],
sequenceNumber: sequenceNumber + 1,
}
this._flushEncryptFrame(encryptFrame)
.then(() => this._transform(tail, encoding, callback))
.catch(callback)
}
_flush(callback: ErrBack) {
const encryptFrame = getEncryptFrame({
pendingFrame: accumulatingFrame,
messageHeader,
getCipher,
isFinalFrame: true,
suite,
})
this._flushEncryptFrame(encryptFrame)
.then(() => callback())
.catch(callback)
}
_destroy() {
dispose()
}
_read(size: number) {
super._read(size)
/* The _flushEncryptFrame encrypts and pushes the frame.
* If this.push returns false then this stream
* should wait until the destination stream calls read.
* This means that _flushEncryptFrame needs to wait for some
* indeterminate time. I create a closure around
* the resolution function for a promise that
* is created in _flushEncryptFrame. This way
* here in _read (the implementation of read)
* if a frame is being pushed, we can release
* it.
*/
pathologicalDrain()
pathologicalDrain = noop
}
async _flushEncryptFrame(encryptingFrame: EncryptFrame) {
const { content, cipher, bodyHeader, isFinalFrame } = encryptingFrame
this.push(bodyHeader)
let frameSize = 0
const cipherContent: Buffer[] = []
for (const clearChunk of content) {
const cipherText = cipher.update(clearChunk)
frameSize += cipherText.length
cipherContent.push(cipherText)
await ioTick()
}
/* Finalize the cipher and handle any tail. */
const tail = cipher.final()
frameSize += tail.length
cipherContent.push(tail)
/* Push the authTag onto the end. Yes, I am abusing the name. */
cipherContent.push(cipher.getAuthTag())
needs(
frameSize === frameLength || (isFinalFrame && frameLength >= frameSize),
'Malformed frame'
)
for (const cipherText of cipherContent) {
if (!this.push(cipherText)) {
/* back pressure: if push returns false, wait until _read
* has been called.
*/
await new Promise((resolve) => {
pathologicalDrain = resolve
})
}
}
if (isFinalFrame) this.push(null)
}
})()
}
type EncryptFrameInput = {
pendingFrame: AccumulatingFrame
messageHeader: MessageHeader
getCipher: GetCipher
isFinalFrame: boolean
suite: NodeAlgorithmSuite
}
export function getEncryptFrame(input: EncryptFrameInput): EncryptFrame {
const { pendingFrame, messageHeader, getCipher, isFinalFrame, suite } = input
const { sequenceNumber, contentLength, content } = pendingFrame
const { frameLength, contentType, messageId } = messageHeader
/* Precondition: The content length MUST correlate with the frameLength.
* In the case of a regular frame,
* the content length MUST strictly equal the frame length.
* In the case of the final frame,
* it MUST NOT be larger than the frame length.
*/
needs(
frameLength === contentLength ||
(isFinalFrame && frameLength >= contentLength),
`Malformed frame length and content length: ${JSON.stringify({
frameLength,
contentLength,
isFinalFrame,
})}`
)
const frameIv = serialize.frameIv(suite.ivLength, sequenceNumber)
const bodyHeader = Buffer.from(
isFinalFrame
? finalFrameHeader(sequenceNumber, frameIv, contentLength)
: frameHeader(sequenceNumber, frameIv)
)
const contentString = aadUtility.messageAADContentString({
contentType,
isFinalFrame,
})
const { buffer, byteOffset, byteLength } = aadUtility.messageAAD(
messageId,
contentString,
sequenceNumber,
contentLength
)
const cipher = getCipher(frameIv)
cipher.setAAD(Buffer.from(buffer, byteOffset, byteLength))
return { content, cipher, bodyHeader, isFinalFrame }
}