Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parameterize segment IDs #16174

Merged
merged 4 commits into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
.allMatch(Intervals::canCompareEndpointsAsStrings);
final SqlSegmentsMetadataQuery.IntervalMode intervalMode = SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS;

SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
queryBuilder,
compareIntervalEndpointsAsString ? intervals : Collections.emptyList(),
intervalMode,
connector
queryBuilder.append(
SqlSegmentsMetadataQuery.getConditionForIntervalsAndMatchMode(
compareIntervalEndpointsAsString ? intervals : Collections.emptyList(),
intervalMode,
connector.getQuoteString()
)
);

final String queryString = StringUtils.format(queryBuilder.toString(), dbTables.getSegmentsTable());
Expand All @@ -200,7 +201,7 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
.bind("dataSource", dataSource);

if (compareIntervalEndpointsAsString) {
SqlSegmentsMetadataQuery.bindQueryIntervals(query, intervals);
SqlSegmentsMetadataQuery.bindIntervalsToQuery(query, intervals);
}

final List<Pair<DataSegment, String>> segmentsWithCreatedDates = query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,20 @@ public List<DataSegmentPlus> retrieveSegmentsById(String datasource, Set<String>

private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource, List<String> segmentIds)
{
if (segmentIds.isEmpty()) {
if (CollectionUtils.isNullOrEmpty(segmentIds)) {
return Collections.emptyList();
}

final String segmentIdCsv = segmentIds.stream()
.map(id -> "'" + id + "'")
.collect(Collectors.joining(","));
ResultIterator<DataSegmentPlus> resultIterator = handle
.createQuery(
StringUtils.format(
"SELECT payload, used FROM %s WHERE dataSource = :dataSource AND id IN (%s)",
dbTables.getSegmentsTable(), segmentIdCsv
)
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
"SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds)
)
);

bindColumnValuesToQueryWithInCondition("id", segmentIds, query);

ResultIterator<DataSegmentPlus> resultIterator = query
.bind("dataSource", datasource)
.setFetchSize(connector.getStreamingFetchSize())
.map(
Expand Down Expand Up @@ -357,7 +357,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
sb.append(getConditionForVersions(versions));
sb.append(getParameterizedInConditionForColumn("version", versions));
}

final Update stmt = handle
Expand All @@ -367,7 +367,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
.bind("used_status_last_updated", DateTimes.nowUtc().toString());

if (hasVersions) {
bindVersionsToQuery(stmt, versions);
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}

return stmt.execute();
Expand All @@ -389,7 +389,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
sb.append(getConditionForVersions(versions));
sb.append(getParameterizedInConditionForColumn("version", versions));
}

final Update stmt = handle
Expand All @@ -401,7 +401,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
.bind("used_status_last_updated", DateTimes.nowUtc().toString());

if (hasVersions) {
bindVersionsToQuery(stmt, versions);
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
} else {
Expand Down Expand Up @@ -471,28 +471,28 @@ public DataSegment retrieveSegmentForId(String id)
}

/**
* Append the condition for the interval and match mode to the given string builder with a partial query
* @param sb - StringBuilder containing the paritial query with SELECT clause and WHERE condition for used, datasource
* Get the condition for the interval and match mode.
* @param intervals - intervals to fetch the segments for
* @param matchMode - Interval match mode - overlaps or contains
* @param connector - SQL connector
* @param quoteString - the connector-specific quote string
*/
public static void appendConditionForIntervalsAndMatchMode(
final StringBuilder sb,
public static String getConditionForIntervalsAndMatchMode(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final SQLMetadataConnector connector
final String quoteString
)
{
if (intervals.isEmpty()) {
return;
return "";
}

final StringBuilder sb = new StringBuilder();

sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
matchMode.makeSqlCondition(
connector.getQuoteString(),
quoteString,
StringUtils.format(":start%d", i),
StringUtils.format(":end%d", i)
)
Expand Down Expand Up @@ -525,14 +525,14 @@ public static void appendConditionForIntervalsAndMatchMode(
));
}
sb.append(")");
return sb.toString();
}

/**
* Given a Query object bind the input intervals to it
* @param query Query to fetch segments
* @param intervals Intervals to fetch segments for
* Bind the supplied {@code intervals} to {@code query}.
* @see #getConditionForIntervalsAndMatchMode(Collection, IntervalMode, String)
*/
public static void bindQueryIntervals(final Query<Map<String, Object>> query, final Collection<Interval> intervals)
public static void bindIntervalsToQuery(final Query<Map<String, Object>> query, final Collection<Interval> intervals)
{
if (intervals.isEmpty()) {
return;
Expand Down Expand Up @@ -730,13 +730,13 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
}

if (compareAsString) {
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString()));
}

final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
sb.append(getConditionForVersions(versions));
sb.append(getParameterizedInConditionForColumn("version", versions));
}

// Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null.
Expand Down Expand Up @@ -783,11 +783,11 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
}

if (compareAsString) {
bindQueryIntervals(sql, intervals);
bindIntervalsToQuery(sql, intervals);
}

if (hasVersions) {
bindVersionsToQuery(sql, versions);
bindColumnValuesToQueryWithInCondition("version", versions, sql);
}

return sql;
Expand Down Expand Up @@ -890,33 +890,49 @@ private static int computeNumChangedSegments(List<String> segmentIds, int[] segm
return numChangedSegments;
}

private static String getConditionForVersions(final List<String> versions)
/**
* @return a parameterized {@code IN} clause for the specified {@code columnName}. The column values need to be bound
* to a query by calling {@link #bindColumnValuesToQueryWithInCondition(String, List, SQLStatement)}.
*
* @implNote JDBI 3.x has better support for binding {@code IN} clauses directly.
*/
private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
if (CollectionUtils.isNullOrEmpty(values)) {
return "";
}

final StringBuilder sb = new StringBuilder();

sb.append(" AND version IN (");
for (int i = 0; i < versions.size(); i++) {
sb.append(StringUtils.format(":version%d", i));
if (i != versions.size() - 1) {
sb.append(StringUtils.format(" AND %s IN (", columnName));
for (int i = 0; i < values.size(); i++) {
sb.append(StringUtils.format(":%s%d", columnName, i));
if (i != values.size() - 1) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}

private static void bindVersionsToQuery(final SQLStatement query, final List<String> versions)
/**
* Binds the provided list of {@code values} to the specified {@code columnName} in the given SQL {@code query} that
* contains an {@code IN} clause.
*
* @see #getParameterizedInConditionForColumn(String, List)
*/
private static void bindColumnValuesToQueryWithInCondition(
final String columnName,
final List<String> values,
final SQLStatement<?> query
)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
if (CollectionUtils.isNullOrEmpty(values)) {
return;
}

for (int i = 0; i < versions.size(); i++) {
query.bind(StringUtils.format("version%d", i), versions.get(i));
for (int i = 0; i < values.size(); i++) {
query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
}
}

Expand Down
Loading