-
Notifications
You must be signed in to change notification settings - Fork 0
/
MongoExtensions.cs
384 lines (361 loc) · 17.7 KB
/
MongoExtensions.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
using System.Collections.Generic;
using System.Dynamic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using MongoDB.Driver;
using MongoDB.Bson;
using System.Collections;
using Newtonsoft.Json.Linq;
using System;
using System.Reflection;
using MongoDB.Bson.Serialization.Attributes;
using System.Linq.Expressions;
using CSharpExtensions.OpenSource;
namespace CSharpExtensions.OpenSource.Mongo
{
public static class MongoExtensions
{
public static IMongoCollection<T> GetMongoCollection<T>(this string connectionString, string dbName, string collectionName)
{
IMongoClient client = new MongoClient(connectionString);
IMongoDatabase database = client.GetDatabase(dbName);
return database.GetCollection<T>(collectionName);
}
private static Expression<Func<T, object>> PropToLambda<T>(string propName)
{
var paramExpression = Expression.Parameter(typeof(T));
var propExpression = Expression.PropertyOrField(paramExpression, propName);
var propertyObjExpr = Expression.Convert(propExpression, typeof(object));
var lamda = Expression.Lambda<Func<T, object>>(propertyObjExpr, paramExpression)!;
return lamda;
}
public static string? RenderToJson<T>(this FilterDefinition<T> filter)
{
var serializerRegistry = MongoDB.Bson.Serialization.BsonSerializer.SerializerRegistry;
var documentSerializer = serializerRegistry.GetSerializer<T>();
return filter.Render(documentSerializer, serializerRegistry).BsonToJson();
}
private static UpdateDefinition<T> SetOrSetOnInsertAllProps<T>(this UpdateDefinitionBuilder<T> updateBuilder, bool setOnInsert, T obj, bool setIfNotEmpty = false, bool setIfNotNull = false, params string[] excludeProps) where T : class
{
var props = obj.GetPropsToUpdate(setIfNotEmpty, setIfNotNull, excludeProps);
var update = updateBuilder.Unset(" ");
foreach (var prop in props)
{
var lamda = PropToLambda<T>(prop.Name);
var val = lamda.Compile()(obj);
// regular prop
if (prop.GetSetMethod() != null)
{
update = setOnInsert ? update.SetOnInsert(lamda, val) : update.Set(lamda, val);
}
// prop with only getter
else
{
update = setOnInsert ? update.SetOnInsert(prop.Name, val) : update.Set(prop.Name, val);
}
}
return update;
}
public static UpdateDefinition<T> SetOnInsertAllProps<T>(this UpdateDefinitionBuilder<T> updateBuilder, T obj, bool setIfNotEmpty = false, bool setIfNotNull = false, params string[] excludeProps) where T : class
=> SetOrSetOnInsertAllProps(updateBuilder, setOnInsert: true, obj, setIfNotEmpty, setIfNotNull, excludeProps);
public static UpdateDefinition<T> SetAllProps<T>(this UpdateDefinitionBuilder<T> updateBuilder, T obj, bool setIfNotEmpty = false, bool setIfNotNull = false, params string[] excludeProps) where T : class
=> SetOrSetOnInsertAllProps(updateBuilder, setOnInsert: false, obj, setIfNotEmpty, setIfNotNull, excludeProps);
public static List<PropertyInfo> GetPropsToUpdate<T>(this T obj, bool setIfNotEmpty = false, bool setIfNotNull = false, params string[] excludeProps) where T : class
{
var props = obj.GetType().PowerfulGetProperties()
.Where(x => !excludeProps.Any(excludeName => x.Name == excludeName))
.Where(x => x.GetCustomAttribute(typeof(BsonIgnoreAttribute), true) == null)
.ToList();
var res = new List<PropertyInfo>();
foreach (var prop in props)
{
var val = prop.GetValue(obj);
var shouldIgnoreIfNull = setIfNotNull || prop.GetCustomAttribute(typeof(BsonIgnoreAttribute), true) != null;
var shouldIgnoreIfDefault = prop.GetCustomAttribute(typeof(BsonIgnoreIfDefaultAttribute), true) != null;
if (val == null && shouldIgnoreIfNull)
{
continue;
}
if ((val == null || (val is string str && string.IsNullOrEmpty(str.Trim()))) && setIfNotEmpty)
{
continue;
}
if (shouldIgnoreIfDefault && val == default)
{
continue;
}
res.Add(prop);
}
return res.DistinctBy(x => x.Name).ToList();
}
public static async Task RemoveDuplicatesAsync<T, S>(this IMongoCollection<T> mongoCollection, Expression<Func<T, S>> uniqueKey, Expression<Func<T, ObjectId?>> mongoId, CancellationToken ct = default)
{
await foreach (var bulk in mongoCollection.Pagination(Builders<T>.Filter.Empty, Builders<T>.Sort.Descending("_id"), 1000))
{
if (ct.IsCancellationRequested) { return; }
var bulkDel = new List<DeleteManyModel<T>>();
foreach (var item in bulk)
{
bulkDel.Add(new(Builders<T>.Filter.And(
Builders<T>.Filter.Eq(uniqueKey, uniqueKey.Compile()(item)),
Builders<T>.Filter.Lt(mongoId, mongoId.Compile()(item))
)));
}
if (bulkDel.Any()) { await mongoCollection.BulkWriteAsync(bulkDel, new() { IsOrdered = true }); }
}
}
public static async Task<BulkWriteResult<T>?> EnumsNumbersToStrings<T>(this IMongoCollection<T> col, params Expression<Func<T, Enum?>>[] propsExp)
{
var bulk = new List<UpdateManyModel<T>>();
foreach (var propExp in propsExp)
{
var expression = (propExp.Body as MemberExpression ?? ((UnaryExpression)propExp.Body).Operand as MemberExpression) ?? throw new Exception($"Expression '{propExp}' not supported.");
var propName = expression.Member.Name;
var propType = (expression.Member as PropertyInfo)?.PropertyType ?? (expression.Member as FieldInfo)?.FieldType ?? throw new Exception($"Expression Member '{expression.Member}' not supported.");
propType = Nullable.GetUnderlyingType(propType) ?? propType;
var lamda = PropToLambda<T>(propName);
foreach (var item in Enum.GetValues(propType))
{
var invVal = (int)item;
var stringVal = item.ToString()!;
var filter = Builders<T>.Filter.Eq(lamda, invVal);
var update = Builders<T>.Update.Set(lamda, stringVal);
bulk.Add(new(filter, update));
}
}
if (bulk.Any()) { return await col.BulkWriteAsync(bulk, new() { IsOrdered = true }); }
return null;
}
public static async IAsyncEnumerable<List<T>> Pagination<T>
(
this IMongoCollection<T> col,
FilterDefinition<T> filter,
SortDefinition<T> sort,
int size = 1000,
bool disableSkip = false,
bool updateTotalCount = false,
bool disableCount = false,
Action<string>? logger = null
)
{
logger = logger ?? (str => { });
var totalCount = disableCount ? -1 : await col.CountDocumentsAsync(filter);
var index = 0;
logger($"MongoPagination - {col.CollectionNamespace.CollectionName} - {(totalCount > 0 ? $"{totalCount} items, " : "")}{size} per get");
var totalExec = 0;
while (true)
{
var data = await col.Find(filter)
.Sort(sort)
.Skip(disableSkip ? 0 : index * size)
.Limit(size)
.ToListAsync();
if (!data.Any()) { yield break; }
totalExec += data.Count;
if (!disableCount && updateTotalCount) { totalCount = await col.CountDocumentsAsync(filter); }
logger($"MongoPagination - {col.CollectionNamespace.CollectionName} - {totalExec}{(totalCount > 0 ? $"/{totalCount}" : "")}");
yield return data;
index++;
}
}
public static async IAsyncEnumerable<T> PaginationWithUpdate<T>
(
this IMongoCollection<T> col,
FilterDefinition<T> filter,
UpdateDefinition<T> update,
SortDefinition<T>? sort = null,
Action<string>? logger = null
)
{
logger = logger ?? (str => { });
var totalCount = await col.CountDocumentsAsync(filter);
logger($"MongoPaginationWithUpdate - {col.CollectionNamespace.CollectionName} - {totalCount} items");
var totalExec = 0;
var lastTotalSync = DateTime.UtcNow;
while (true)
{
var data = await col.FindOneAndUpdateAsync(filter, update, new FindOneAndUpdateOptions<T, T> { Sort = sort });
if (data != null)
{
yield return data;
}
totalExec++;
if ((DateTime.UtcNow - lastTotalSync).TotalMinutes >= 1)
{
totalCount = await col.CountDocumentsAsync(filter);
lastTotalSync = DateTime.UtcNow;
}
logger($"MongoPaginationWithUpdate - {col.CollectionNamespace.CollectionName} - {totalExec}/{totalCount}");
}
}
public static async Task<List<T>> GetAllByBulks<T>(this IMongoCollection<T> col, FilterDefinition<T> filter, SortDefinition<T> sort, int size = 1000, bool disableSkip = false)
{
var res = new List<T>();
await foreach (var bulk in col.Pagination(filter, sort, size, disableSkip))
{
res.AddRange(bulk);
}
return res;
}
public static PipelineDefinition<T, S> GetPipelineDefinition<T, S>(this string pipelineJson)
{
var bsonDocArray = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<BsonDocument[]>(pipelineJson);
var pipelineDefinition = PipelineDefinition<T, S>.Create(bsonDocArray);
return pipelineDefinition;
}
public static Task<IAsyncCursor<BsonDocument>> AggregateAsync<T>(this IMongoCollection<T> col, string aggQuery, AggregateOptions? options = null, CancellationToken cancellationToken = default)
{
var pipelineDefinition = aggQuery.GetPipelineDefinition<T, BsonDocument>();
return col.AggregateAsync(pipelineDefinition, options, cancellationToken);
}
public static IAsyncCursor<BsonDocument> Aggregate<T>(this IMongoCollection<T> col, string aggQuery, AggregateOptions? options = null, CancellationToken cancellationToken = default)
{
var bsonArray = BsonArray.Create(aggQuery);
var bsonDocArray = bsonArray.Select(x => x.AsBsonDocument).ToArray();
var pipelineDefinition = PipelineDefinition<T, BsonDocument>.Create(bsonDocArray);
return col.Aggregate(pipelineDefinition, options, cancellationToken);
}
public static Task<List<T>> GetAllAsync<T>(this IMongoCollection<T> collection) => collection.Find(item => true).ToListAsync();
public static dynamic? ToDynamic<T>(this T? value)
{
if (value == null) { return null; }
var jsonSettings = GenericsExtensions.GetJsonSerializerSettings(TypeNameHandling.None);
string? json = "";
if (value is BsonDocument)
{
var jsonWriterSettings = new MongoDB.Bson.IO.JsonWriterSettings { OutputMode = MongoDB.Bson.IO.JsonOutputMode.Strict };
json = value.ToJson(jsonWriterSettings);
}
else
{
json = value is string ? value.ToString() : JsonConvert.SerializeObject(value, jsonSettings);
}
if (json != null && json[0] == '[')
{
try
{
return JsonConvert.DeserializeObject<ExpandoObject[]>(json, jsonSettings)!;
}
catch
{
return JsonConvert.DeserializeObject(json, jsonSettings)!;
}
}
try
{
return JsonConvert.DeserializeObject<ExpandoObject>(json ?? "{}", jsonSettings)!;
}
catch
{
return JsonConvert.DeserializeObject(json ?? "{}", jsonSettings)!;
}
}
public static BsonValue ToBsonValue<T>(this T value)
{
var jsonSettings = GenericsExtensions.GetJsonSerializerSettings(TypeNameHandling.None);
if (!(value is ExpandoObject) && value is IEnumerable collection)
{
var arr = new BsonArray();
foreach (var item in collection)
{
arr.Add(ToBsonValue(item));
}
return arr;
}
var json = JToken.Parse(JsonConvert.SerializeObject(value, jsonSettings)).RemovePropRecursive("$type").ToString();
return BsonDocument.Parse(json);
}
public static async Task<List<string>> GetIndexesNames<T>(this IMongoCollection<T> col) => (await (await col.Indexes.ListAsync()).ToListAsync()).Select(x => x["name"].AsString).ToList();
public static async Task CreateIndexesIfNotExists<T>
(
this IMongoCollection<T> col,
List<CreateIndexModel<T>> createIndexModels,
bool forceUpdate = false,
bool dropIndexesThatNotInTheList = false
)
{
var names = createIndexModels.Select(x => x.Options.Name);
if (names.Any(x => string.IsNullOrEmpty(x)))
{
throw new Exception("Must Set CreateIndexModel -> Option -> Name");
}
if (dropIndexesThatNotInTheList)
{
var allIndexes = await col.GetIndexesNames();
foreach (var index in allIndexes.Where(x => x != "_id" && x != "_id_"))
{
if (!names.Contains(index))
{
Console.WriteLine($"{col.CollectionNamespace.CollectionName} - Droping Index {index}");
await col.Indexes.DropOneAsync(index);
}
}
}
var existingIndexes = await col.GetIndexesNames();
foreach (var createIndexModel in createIndexModels)
{
if (!forceUpdate && existingIndexes.Contains(createIndexModel.Options.Name))
{
continue;
}
try
{
await col.Indexes.CreateOneAsync(createIndexModel);
}
catch (Exception ex) when (ex.ToString().Contains("Index must have unique name"))
{
await col.Indexes.DropOneAsync(createIndexModel.Options.Name);
await col.Indexes.CreateOneAsync(createIndexModel);
}
}
}
public static T CleanupBson<T>(this T value)
{
var json = value.BsonToJson().ToCleanJson();
var bson = MongoDB.Bson.BsonDocument.Parse(json);
value = MongoDB.Bson.Serialization.BsonSerializer.Deserialize<T>(bson);
return value;
}
public static string BsonToJson<T>(this T value, bool format = false, MongoDB.Bson.IO.JsonOutputMode jsonOutputMode = MongoDB.Bson.IO.JsonOutputMode.Strict)
=> MongoDB.Bson.BsonExtensionMethods.ToJson(value,
new MongoDB.Bson.IO.JsonWriterSettings
{
OutputMode = jsonOutputMode,
Indent = format
}
).FromJson<JObject>()!.RemoveDuplicateKeys().ToJson()!;
public static T FromBson<T>(this string bson)
{
var dict = bson.FromJson<Dictionary<string, object>>();
if (dict?.ContainsKey("_t") == true && dict["_t"] is not string)
{
var lst = dict["_t"].ToJson()!.FromJson<List<string>>();
dict["_t"] = lst.Last();
}
return MongoDB.Bson.BsonDocument.Parse(dict.ToJson()).BsonToObj<T>();
}
public static T BsonToObj<T>(this MongoDB.Bson.BsonDocument bsonDoc)
{
if (bsonDoc.Contains("_t") && bsonDoc["_t"].IsBsonArray)
{
bsonDoc["_t"] = bsonDoc["_t"].AsBsonArray.Last();
}
return MongoDB.Bson.Serialization.BsonSerializer.Deserialize<T>(bsonDoc);
}
public class AlwaysAllowDuplicateNamesBsonDocumentSerializer : MongoDB.Bson.Serialization.Serializers.BsonDocumentSerializer
{
protected override MongoDB.Bson.BsonDocument DeserializeValue(MongoDB.Bson.Serialization.BsonDeserializationContext context, MongoDB.Bson.Serialization.BsonDeserializationArgs args)
{
context = context.With(c => c.AllowDuplicateElementNames = true);
return base.DeserializeValue(context, args);
}
public override MongoDB.Bson.BsonDocument Deserialize(MongoDB.Bson.Serialization.BsonDeserializationContext context, MongoDB.Bson.Serialization.BsonDeserializationArgs args)
{
context = context.With(c => c.AllowDuplicateElementNames = true);
return base.Deserialize(context, args);
}
}
}
}