diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java index 6e7f86351..f05e293af 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/MongoDBReader.java @@ -25,6 +25,7 @@ import com.qlangtech.tis.plugin.ds.ISelectedTab; import com.qlangtech.tis.plugin.ds.RunningContext; import org.apache.commons.lang.StringUtils; +import org.bson.BsonDocument; import org.bson.Document; /** @@ -120,13 +121,13 @@ public void startRead(RecordSender recordSender) { MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } MongoDatabase db = mongoClient.getDatabase(database); - MongoCollection col = db.getCollection(this.collection); + MongoCollection col = db.getCollection(this.collection, BsonDocument.class); - MongoCursor dbCursor = null; + MongoCursor dbCursor = null; Document filter = mongoTable.getCollectionQueryFilter();// new Document(); dbCursor = col.find(filter).iterator(); - Document item = null; + BsonDocument item = null; // Record record = null; Objects.requireNonNull(this.col2IndexMapper, "col2IndexMapper can not be null"); while (dbCursor.hasNext()) { diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java index 3153dcfdd..5a5a4275e 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/CollectionSplitUtil.java @@ -20,11 +20,12 @@ * Created by jianying.wcj on 2015/3/19 0019. * Modified by mingyan.zc on 2016/6/13. * Modified by mingyan.zc on 2017/7/5. + * Modified by baisui(baisui@qlangtech.com) on 2024/12/10 */ public class CollectionSplitUtil { public static List doSplit( - Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) { + Configuration originalSliceConfig, int adviceNumber, MongoClient mongoClient) { List confList = new ArrayList(); @@ -32,15 +33,15 @@ public static List doSplit( String collName = originalSliceConfig.getString(KeyConstant.MONGO_COLLECTION_NAME); - if(Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) { + if (Strings.isNullOrEmpty(dbName) || Strings.isNullOrEmpty(collName) || mongoClient == null) { throw DataXException.asDataXException(MongoDBReaderErrorCode.ILLEGAL_VALUE, - MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); + MongoDBReaderErrorCode.ILLEGAL_VALUE.getDescription()); } boolean isObjectId = isPrimaryIdObjectId(mongoClient, dbName, collName); List rangeList = doSplitCollection(adviceNumber, mongoClient, dbName, collName, isObjectId); - for(Range range : rangeList) { + for (Range range : rangeList) { Configuration conf = originalSliceConfig.clone(); conf.set(KeyConstant.LOWER_BOUND, range.lowerBound); conf.set(KeyConstant.UPPER_BOUND, range.upperBound); @@ -54,6 +55,9 @@ public static List doSplit( private static boolean isPrimaryIdObjectId(MongoClient mongoClient, String dbName, String collName) { MongoDatabase database = mongoClient.getDatabase(dbName); MongoCollection col = database.getCollection(collName); + if (col.estimatedDocumentCount() < 1) { + throw new IllegalStateException("collection:" + collName + " have any doc "); + } Document doc = col.find().limit(1).first(); Object id = doc.get(KeyConstant.MONGO_PRIMARY_ID); if (id instanceof ObjectId) { @@ -95,31 +99,31 @@ private static List doSplitCollection(int adviceNumber, MongoClient mongo boolean supportSplitVector = true; try { database.runCommand(new Document("splitVector", dbName + "." + collName) - .append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1)) - .append("force", true)); + .append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1)) + .append("force", true)); } catch (MongoCommandException e) { if (e.getErrorCode() == KeyConstant.MONGO_UNAUTHORIZED_ERR_CODE || - e.getErrorCode() == KeyConstant.MONGO_ILLEGALOP_ERR_CODE) { + e.getErrorCode() == KeyConstant.MONGO_ILLEGALOP_ERR_CODE) { supportSplitVector = false; } } if (supportSplitVector) { - final boolean forceMedianSplit = true; + final boolean forceMedianSplit = true; int maxChunkSize = (docCount / splitPointCount - 1) * 2 * avgObjSize / (1024 * 1024); //int maxChunkSize = (chunkDocCount - 1) * 2 * avgObjSize / (1024 * 1024); if (maxChunkSize < 1) { - // forceMedianSplit = true; + // forceMedianSplit = true; } if (!forceMedianSplit) { result = database.runCommand(new Document("splitVector", dbName + "." + collName) - .append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1)) - .append("maxChunkSize", maxChunkSize) - .append("maxSplitPoints", adviceNumber - 1)); + .append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1)) + .append("maxChunkSize", maxChunkSize) + .append("maxSplitPoints", adviceNumber - 1)); } else { result = database.runCommand(new Document("splitVector", dbName + "." + collName) - .append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1)) - .append("force", true)); + .append("keyPattern", new Document(KeyConstant.MONGO_PRIMARY_ID, 1)) + .append("force", true)); } ArrayList splitKeys = result.get("splitKeys", ArrayList.class); @@ -127,7 +131,7 @@ private static List doSplitCollection(int adviceNumber, MongoClient mongo Document splitKey = splitKeys.get(i); Object id = splitKey.get(KeyConstant.MONGO_PRIMARY_ID); if (isObjectId) { - ObjectId oid = (ObjectId)id; + ObjectId oid = (ObjectId) id; splitPoints.add(oid.toHexString()); } else { splitPoints.add(id); @@ -141,7 +145,7 @@ private static List doSplitCollection(int adviceNumber, MongoClient mongo Document doc = col.find().skip(skipCount).limit(chunkDocCount).first(); Object id = doc.get(KeyConstant.MONGO_PRIMARY_ID); if (isObjectId) { - ObjectId oid = (ObjectId)id; + ObjectId oid = (ObjectId) id; splitPoints.add(oid.toHexString()); } else { splitPoints.add(id); diff --git a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/IMongoTable.java b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/IMongoTable.java index f4a87371e..f01b61f83 100644 --- a/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/IMongoTable.java +++ b/mongodbreader/src/main/java/com/alibaba/datax/plugin/reader/mongodbreader/util/IMongoTable.java @@ -1,6 +1,7 @@ package com.alibaba.datax.plugin.reader.mongodbreader.util; import com.alibaba.datax.common.element.Record; +import org.bson.BsonDocument; import org.bson.Document; /** @@ -17,6 +18,6 @@ public interface IMongoTable { */ //public List getMongoPresentCols(); - Record convert2RecordByItem(Record record, Document item); + Record convert2RecordByItem(Record record, BsonDocument item); }