forked from globalsign/mgo
-
Notifications
You must be signed in to change notification settings - Fork 2
/
changestreams.go
357 lines (299 loc) · 9.8 KB
/
changestreams.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
package mgo
import (
"errors"
"fmt"
"reflect"
"sync"
"time"
"github.com/izinga/mgo/bson"
)
type FullDocument string
const (
Default = "default"
UpdateLookup = "updateLookup"
)
type ChangeStream struct {
iter *Iter
isClosed bool
options ChangeStreamOptions
pipeline interface{}
resumeToken *bson.Raw
collection *Collection
readPreference *ReadPreference
err error
m sync.Mutex
sessionCopied bool
}
type ChangeStreamOptions struct {
// FullDocument controls the amount of data that the server will return when
// returning a changes document.
FullDocument FullDocument
// ResumeAfter specifies the logical starting point for the new change stream.
ResumeAfter *bson.Raw
// MaxAwaitTimeMS specifies the maximum amount of time for the server to wait
// on new documents to satisfy a change stream query.
MaxAwaitTimeMS time.Duration
// BatchSize specifies the number of documents to return per batch.
BatchSize int
// Collation specifies the way the server should collate returned data.
//TODO Collation *Collation
}
var errMissingResumeToken = errors.New("resume token missing from result")
// Watch constructs a new ChangeStream capable of receiving continuing data
// from the database.
func (coll *Collection) Watch(pipeline interface{},
options ChangeStreamOptions) (*ChangeStream, error) {
if pipeline == nil {
pipeline = []bson.M{}
}
csPipe := constructChangeStreamPipeline(pipeline, options)
pipe := coll.Pipe(&csPipe)
if options.MaxAwaitTimeMS > 0 {
pipe.SetMaxTime(options.MaxAwaitTimeMS)
}
if options.BatchSize > 0 {
pipe.Batch(options.BatchSize)
}
pIter := pipe.Iter()
// check that there was no issue creating the iterator.
// this will fail immediately with an error from the server if running against
// a standalone.
if err := pIter.Err(); err != nil {
return nil, err
}
pIter.isChangeStream = true
return &ChangeStream{
iter: pIter,
collection: coll,
resumeToken: nil,
options: options,
pipeline: pipeline,
}, nil
}
// Next retrieves the next document from the change stream, blocking if necessary.
// Next returns true if a document was successfully unmarshalled into result,
// and false if an error occured. When Next returns false, the Err method should
// be called to check what error occurred during iteration. If there were no events
// available (ErrNotFound), the Err method returns nil so the user can retry the invocaton.
//
// For example:
//
// pipeline := []bson.M{}
//
// changeStream := collection.Watch(pipeline, ChangeStreamOptions{})
// for changeStream.Next(&changeDoc) {
// fmt.Printf("Change: %v\n", changeDoc)
// }
//
// if err := changeStream.Close(); err != nil {
// return err
// }
//
// If the pipeline used removes the _id field from the result, Next will error
// because the _id field is needed to resume iteration when an error occurs.
//
func (changeStream *ChangeStream) Next(result interface{}) bool {
// the err field is being constantly overwritten and we don't want the user to
// attempt to read it at this point so we lock.
changeStream.m.Lock()
defer changeStream.m.Unlock()
// if we are in a state of error, then don't continue.
if changeStream.err != nil {
return false
}
if changeStream.isClosed {
changeStream.err = fmt.Errorf("illegal use of a closed ChangeStream")
return false
}
var err error
// attempt to fetch the change stream result.
err = changeStream.fetchResultSet(result)
if err == nil {
return true
}
// if we get no results we return false with no errors so the user can call Next
// again, resuming is not needed as the iterator is simply timed out as no events happened.
// The user will call Timeout in order to understand if this was the case.
if err == ErrNotFound {
return false
}
// check if the error is resumable
if !isResumableError(err) {
// error is not resumable, give up and return it to the user.
changeStream.err = err
return false
}
// try to resume.
err = changeStream.resume()
if err != nil {
// we've not been able to successfully resume and should only try once,
// so we give up.
changeStream.err = err
return false
}
// we've successfully resumed the changestream.
// try to fetch the next result.
err = changeStream.fetchResultSet(result)
if err != nil {
changeStream.err = err
return false
}
return true
}
// Err returns nil if no errors happened during iteration, or the actual
// error otherwise.
func (changeStream *ChangeStream) Err() error {
changeStream.m.Lock()
defer changeStream.m.Unlock()
return changeStream.err
}
// Close kills the server cursor used by the iterator, if any, and returns
// nil if no errors happened during iteration, or the actual error otherwise.
func (changeStream *ChangeStream) Close() error {
changeStream.m.Lock()
defer changeStream.m.Unlock()
changeStream.isClosed = true
err := changeStream.iter.Close()
if err != nil {
changeStream.err = err
}
if changeStream.sessionCopied {
changeStream.iter.session.Close()
changeStream.sessionCopied = false
}
return err
}
// ResumeToken returns a copy of the current resume token held by the change stream.
// This token should be treated as an opaque token that can be provided to instantiate
// a new change stream.
func (changeStream *ChangeStream) ResumeToken() *bson.Raw {
changeStream.m.Lock()
defer changeStream.m.Unlock()
if changeStream.resumeToken == nil {
return nil
}
var tokenCopy = *changeStream.resumeToken
return &tokenCopy
}
// Timeout returns true if the last call of Next returned false because of an iterator timeout.
func (changeStream *ChangeStream) Timeout() bool {
return changeStream.iter.Timeout()
}
func constructChangeStreamPipeline(pipeline interface{},
options ChangeStreamOptions) interface{} {
pipelinev := reflect.ValueOf(pipeline)
// ensure that the pipeline passed in is a slice.
if pipelinev.Kind() != reflect.Slice {
panic("pipeline argument must be a slice")
}
// construct the options to be used by the change notification
// pipeline stage.
changeStreamStageOptions := bson.M{}
if options.FullDocument != "" {
changeStreamStageOptions["fullDocument"] = options.FullDocument
}
if options.ResumeAfter != nil {
changeStreamStageOptions["resumeAfter"] = options.ResumeAfter
}
changeStreamStage := bson.M{"$changeStream": changeStreamStageOptions}
pipeOfInterfaces := make([]interface{}, pipelinev.Len()+1)
// insert the change notification pipeline stage at the beginning of the
// aggregation.
pipeOfInterfaces[0] = changeStreamStage
// convert the passed in slice to a slice of interfaces.
for i := 0; i < pipelinev.Len(); i++ {
pipeOfInterfaces[1+i] = pipelinev.Index(i).Addr().Interface()
}
var pipelineAsInterface interface{} = pipeOfInterfaces
return pipelineAsInterface
}
func (changeStream *ChangeStream) resume() error {
// copy the information for the new socket.
// Thanks to Copy() future uses will acquire a new socket against the newly selected DB.
newSession := changeStream.iter.session.Copy()
// fetch the cursor from the iterator and use it to run a killCursors
// on the connection.
cursorId := changeStream.iter.op.cursorId
err := runKillCursorsOnSession(newSession, cursorId)
if err != nil {
return err
}
// change out the old connection to the database with the new connection.
if changeStream.sessionCopied {
changeStream.collection.Database.Session.Close()
}
changeStream.collection.Database.Session = newSession
changeStream.sessionCopied = true
opts := changeStream.options
if changeStream.resumeToken != nil {
opts.ResumeAfter = changeStream.resumeToken
}
// make a new pipeline containing the resume token.
changeStreamPipeline := constructChangeStreamPipeline(changeStream.pipeline, opts)
// generate the new iterator with the new connection.
newPipe := changeStream.collection.Pipe(changeStreamPipeline)
changeStream.iter = newPipe.Iter()
if err := changeStream.iter.Err(); err != nil {
return err
}
changeStream.iter.isChangeStream = true
return nil
}
// fetchResumeToken unmarshals the _id field from the document, setting an error
// on the changeStream if it is unable to.
func (changeStream *ChangeStream) fetchResumeToken(rawResult *bson.Raw) error {
changeStreamResult := struct {
ResumeToken *bson.Raw `bson:"_id,omitempty"`
}{}
err := rawResult.Unmarshal(&changeStreamResult)
if err != nil {
return err
}
if changeStreamResult.ResumeToken == nil {
return errMissingResumeToken
}
changeStream.resumeToken = changeStreamResult.ResumeToken
return nil
}
func (changeStream *ChangeStream) fetchResultSet(result interface{}) error {
rawResult := bson.Raw{}
// fetch the next set of documents from the cursor.
gotNext := changeStream.iter.Next(&rawResult)
err := changeStream.iter.Err()
if err != nil {
return err
}
if !gotNext && err == nil {
// If the iter.Err() method returns nil despite us not getting a next batch,
// it is becuase iter.Err() silences this case.
return ErrNotFound
}
// grab the resumeToken from the results
if err := changeStream.fetchResumeToken(&rawResult); err != nil {
return err
}
// put the raw results into the data structure the user provided.
if err := rawResult.Unmarshal(result); err != nil {
return err
}
return nil
}
func isResumableError(err error) bool {
_, isQueryError := err.(*QueryError)
// if it is not a database error OR it is a database error,
// but the error is a notMaster error
//and is not a missingResumeToken error (caused by the user provided pipeline)
return (!isQueryError || isNotMasterError(err)) && (err != errMissingResumeToken)
}
func runKillCursorsOnSession(session *Session, cursorId int64) error {
socket, err := session.acquireSocket(true)
if err != nil {
return err
}
err = socket.Query(&killCursorsOp{[]int64{cursorId}})
if err != nil {
return err
}
socket.Release()
return nil
}