Skip to content

Commit

Permalink
add estimatedDocumentCount can not small than 1
Browse files Browse the repository at this point in the history
  • Loading branch information
baisui1981 committed Dec 10, 2024
1 parent 117ce4a commit e57bcd4
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.qlangtech.tis.plugin.ds.ISelectedTab;
import com.qlangtech.tis.plugin.ds.RunningContext;
import org.apache.commons.lang.StringUtils;
import org.bson.BsonDocument;
import org.bson.Document;

/**
Expand Down Expand Up @@ -120,13 +121,13 @@ public void startRead(RecordSender recordSender) {
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}
MongoDatabase db = mongoClient.getDatabase(database);
MongoCollection col = db.getCollection(this.collection);
MongoCollection<BsonDocument> col = db.getCollection(this.collection, BsonDocument.class);

MongoCursor<Document> dbCursor = null;
MongoCursor<BsonDocument> dbCursor = null;
Document filter = mongoTable.getCollectionQueryFilter();// new Document();

dbCursor = col.find(filter).iterator();
Document item = null;
BsonDocument item = null;
// Record record = null;
Objects.requireNonNull(this.col2IndexMapper, "col2IndexMapper can not be null");
while (dbCursor.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,27 +20,28 @@
* Created by jianying.wcj on 2015/3/19 0019.
* Modified by mingyan.zc on 2016/6/13.
* Modified by mingyan.zc on 2017/7/5.
* Modified by baisui([email protected]) on 2024/12/10
*/
public class CollectionSplitUtil {

public static List<Configuration> doSplit(
Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) {
Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) {

List<Configuration> confList = new ArrayList<Configuration>();

String dbName = originalSliceConfig.getString(KeyConstant.MONGO_DB_NAME, originalSliceConfig.getString(KeyConstant.MONGO_DATABASE));

String collName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME);

if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) {
if (Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) {
throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE,
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription());
}

boolean isObjectId = isPrimaryIdObjectId(mongoClient, dbName, collName);

List<Range> rangeList = doSplitCollection(adviceNumber, mongoClient, dbName, collName, isObjectId);
for(Range range : rangeList) {
for (Range range : rangeList) {
Configuration conf = originalSliceConfig.clone();
conf.set(KeyConstant.LOWER_BOUND, range.lowerBound);
conf.set(KeyConstant.UPPER_BOUND, range.upperBound);
Expand All @@ -54,6 +55,9 @@ public static List<Configuration> doSplit(
private static boolean isPrimaryIdObjectId(MongoClient mongoClient, String dbName, String collName) {
MongoDatabase database = mongoClient.getDatabase(dbName);
MongoCollection<Document> col = database.getCollection(collName);
if (col.estimatedDocumentCount() < 1) {
throw new IllegalStateException("collection:" + collName + " have any doc ");
}
Document doc = col.find().limit(1).first();
Object id = doc.get(KeyConstant.MONGO_PRIMARY_ID);
if (id instanceof ObjectId) {
Expand Down Expand Up @@ -95,39 +99,39 @@ private static List<Range> doSplitCollection(int adviceNumber, MongoClient mongo
boolean supportSplitVector = true;
try {
database.runCommand(new Document("splitVector", dbName + "." + collName)
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("force", true));
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("force", true));
} catch (MongoCommandException e) {
if (e.getErrorCode() == KeyConstant.MONGO_UNAUTHORIZED_ERR_CODE ||
e.getErrorCode() == KeyConstant.MONGO_ILLEGALOP_ERR_CODE) {
e.getErrorCode() == KeyConstant.MONGO_ILLEGALOP_ERR_CODE) {
supportSplitVector = false;
}
}

if (supportSplitVector) {
final boolean forceMedianSplit = true;
final boolean forceMedianSplit = true;
int maxChunkSize = (docCount / splitPointCount - 1) * 2 * avgObjSize / (1024 * 1024);
//int maxChunkSize = (chunkDocCount - 1) * 2 * avgObjSize / (1024 * 1024);
if (maxChunkSize < 1) {
// forceMedianSplit = true;
// forceMedianSplit = true;
}
if (!forceMedianSplit) {
result = database.runCommand(new Document("splitVector", dbName + "." + collName)
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("maxChunkSize", maxChunkSize)
.append("maxSplitPoints", adviceNumber - 1));
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("maxChunkSize", maxChunkSize)
.append("maxSplitPoints", adviceNumber - 1));
} else {
result = database.runCommand(new Document("splitVector", dbName + "." + collName)
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("force", true));
.append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1))
.append("force", true));
}
ArrayList<Document> splitKeys = result.get("splitKeys", ArrayList.class);

for (int i = 0; i < splitKeys.size(); i++) {
Document splitKey = splitKeys.get(i);
Object id = splitKey.get(KeyConstant.MONGO_PRIMARY_ID);
if (isObjectId) {
ObjectId oid = (ObjectId)id;
ObjectId oid = (ObjectId) id;
splitPoints.add(oid.toHexString());
} else {
splitPoints.add(id);
Expand All @@ -141,7 +145,7 @@ private static List<Range> doSplitCollection(int adviceNumber, MongoClient mongo
Document doc = col.find().skip(skipCount).limit(chunkDocCount).first();
Object id = doc.get(KeyConstant.MONGO_PRIMARY_ID);
if (isObjectId) {
ObjectId oid = (ObjectId)id;
ObjectId oid = (ObjectId) id;
splitPoints.add(oid.toHexString());
} else {
splitPoints.add(id);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.alibaba.datax.plugin.reader.mongodbreader.util;

import com.alibaba.datax.common.element.Record;
import org.bson.BsonDocument;
import org.bson.Document;

/**
Expand All @@ -17,6 +18,6 @@ public interface IMongoTable {
*/
//public List<IMongoCol> getMongoPresentCols();

Record convert2RecordByItem(Record record, Document item);
Record convert2RecordByItem(Record record, BsonDocument item);

}

0 comments on commit e57bcd4

Please sign in to comment.