Skip to content

Commit

Permalink
Parameterize segment IDs (#16174)
Browse files Browse the repository at this point in the history
* Add parameterized segment IDs.

* Refactor into one common method.

* Refactor getConditionForIntervalsAndMatchMode - pass in only what's needed.

* Minor cleanup.
  • Loading branch information
abhishekrb19 authored Mar 22, 2024
1 parent c72e69a commit a70e28a
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 46 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -185,11 +185,12 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
.allMatch(Intervals::canCompareEndpointsAsStrings);
final SqlSegmentsMetadataQuery.IntervalMode intervalMode = SqlSegmentsMetadataQuery.IntervalMode.OVERLAPS;

SqlSegmentsMetadataQuery.appendConditionForIntervalsAndMatchMode(
queryBuilder,
compareIntervalEndpointsAsString ? intervals : Collections.emptyList(),
intervalMode,
connector
queryBuilder.append(
SqlSegmentsMetadataQuery.getConditionForIntervalsAndMatchMode(
compareIntervalEndpointsAsString ? intervals : Collections.emptyList(),
intervalMode,
connector.getQuoteString()
)
);

final String queryString = StringUtils.format(queryBuilder.toString(), dbTables.getSegmentsTable());
Expand All @@ -200,7 +201,7 @@ public List<Pair<DataSegment, String>> retrieveUsedSegmentsAndCreatedDates(Strin
.bind("dataSource", dataSource);

if (compareIntervalEndpointsAsString) {
SqlSegmentsMetadataQuery.bindQueryIntervals(query, intervals);
SqlSegmentsMetadataQuery.bindIntervalsToQuery(query, intervals);
}

final List<Pair<DataSegment, String>> segmentsWithCreatedDates = query
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,20 +256,20 @@ public List<DataSegmentPlus> retrieveSegmentsById(String datasource, Set<String>

private List<DataSegmentPlus> retrieveSegmentBatchById(String datasource, List<String> segmentIds)
{
if (segmentIds.isEmpty()) {
if (CollectionUtils.isNullOrEmpty(segmentIds)) {
return Collections.emptyList();
}

final String segmentIdCsv = segmentIds.stream()
.map(id -> "'" + id + "'")
.collect(Collectors.joining(","));
ResultIterator<DataSegmentPlus> resultIterator = handle
.createQuery(
StringUtils.format(
"SELECT payload, used FROM %s WHERE dataSource = :dataSource AND id IN (%s)",
dbTables.getSegmentsTable(), segmentIdCsv
)
final Query<Map<String, Object>> query = handle.createQuery(
StringUtils.format(
"SELECT payload, used FROM %s WHERE dataSource = :dataSource %s",
dbTables.getSegmentsTable(), getParameterizedInConditionForColumn("id", segmentIds)
)
);

bindColumnValuesToQueryWithInCondition("id", segmentIds, query);

ResultIterator<DataSegmentPlus> resultIterator = query
.bind("dataSource", datasource)
.setFetchSize(connector.getStreamingFetchSize())
.map(
Expand Down Expand Up @@ -357,7 +357,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
sb.append(getConditionForVersions(versions));
sb.append(getParameterizedInConditionForColumn("version", versions));
}

final Update stmt = handle
Expand All @@ -367,7 +367,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
.bind("used_status_last_updated", DateTimes.nowUtc().toString());

if (hasVersions) {
bindVersionsToQuery(stmt, versions);
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}

return stmt.execute();
Expand All @@ -389,7 +389,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
sb.append(getConditionForVersions(versions));
sb.append(getParameterizedInConditionForColumn("version", versions));
}

final Update stmt = handle
Expand All @@ -401,7 +401,7 @@ public int markSegmentsUnused(final String dataSource, final Interval interval,
.bind("used_status_last_updated", DateTimes.nowUtc().toString());

if (hasVersions) {
bindVersionsToQuery(stmt, versions);
bindColumnValuesToQueryWithInCondition("version", versions, stmt);
}
return stmt.execute();
} else {
Expand Down Expand Up @@ -471,28 +471,28 @@ public DataSegment retrieveSegmentForId(String id)
}

/**
* Append the condition for the interval and match mode to the given string builder with a partial query
* @param sb - StringBuilder containing the paritial query with SELECT clause and WHERE condition for used, datasource
* Get the condition for the interval and match mode.
* @param intervals - intervals to fetch the segments for
* @param matchMode - Interval match mode - overlaps or contains
* @param connector - SQL connector
* @param quoteString - the connector-specific quote string
*/
public static void appendConditionForIntervalsAndMatchMode(
final StringBuilder sb,
public static String getConditionForIntervalsAndMatchMode(
final Collection<Interval> intervals,
final IntervalMode matchMode,
final SQLMetadataConnector connector
final String quoteString
)
{
if (intervals.isEmpty()) {
return;
return "";
}

final StringBuilder sb = new StringBuilder();

sb.append(" AND (");
for (int i = 0; i < intervals.size(); i++) {
sb.append(
matchMode.makeSqlCondition(
connector.getQuoteString(),
quoteString,
StringUtils.format(":start%d", i),
StringUtils.format(":end%d", i)
)
Expand Down Expand Up @@ -525,14 +525,14 @@ public static void appendConditionForIntervalsAndMatchMode(
));
}
sb.append(")");
return sb.toString();
}

/**
* Given a Query object bind the input intervals to it
* @param query Query to fetch segments
* @param intervals Intervals to fetch segments for
* Bind the supplied {@code intervals} to {@code query}.
* @see #getConditionForIntervalsAndMatchMode(Collection, IntervalMode, String)
*/
public static void bindQueryIntervals(final Query<Map<String, Object>> query, final Collection<Interval> intervals)
public static void bindIntervalsToQuery(final Query<Map<String, Object>> query, final Collection<Interval> intervals)
{
if (intervals.isEmpty()) {
return;
Expand Down Expand Up @@ -730,13 +730,13 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
}

if (compareAsString) {
appendConditionForIntervalsAndMatchMode(sb, intervals, matchMode, connector);
sb.append(getConditionForIntervalsAndMatchMode(intervals, matchMode, connector.getQuoteString()));
}

final boolean hasVersions = !CollectionUtils.isNullOrEmpty(versions);

if (hasVersions) {
sb.append(getConditionForVersions(versions));
sb.append(getParameterizedInConditionForColumn("version", versions));
}

// Add the used_status_last_updated time filter only for unused segments when maxUsedStatusLastUpdatedTime is non-null.
Expand Down Expand Up @@ -783,11 +783,11 @@ private Query<Map<String, Object>> buildSegmentsTableQuery(
}

if (compareAsString) {
bindQueryIntervals(sql, intervals);
bindIntervalsToQuery(sql, intervals);
}

if (hasVersions) {
bindVersionsToQuery(sql, versions);
bindColumnValuesToQueryWithInCondition("version", versions, sql);
}

return sql;
Expand Down Expand Up @@ -890,33 +890,49 @@ private static int computeNumChangedSegments(List<String> segmentIds, int[] segm
return numChangedSegments;
}

private static String getConditionForVersions(final List<String> versions)
/**
* @return a parameterized {@code IN} clause for the specified {@code columnName}. The column values need to be bound
* to a query by calling {@link #bindColumnValuesToQueryWithInCondition(String, List, SQLStatement)}.
*
* @implNote JDBI 3.x has better support for binding {@code IN} clauses directly.
*/
private static String getParameterizedInConditionForColumn(final String columnName, final List<String> values)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
if (CollectionUtils.isNullOrEmpty(values)) {
return "";
}

final StringBuilder sb = new StringBuilder();

sb.append(" AND version IN (");
for (int i = 0; i < versions.size(); i++) {
sb.append(StringUtils.format(":version%d", i));
if (i != versions.size() - 1) {
sb.append(StringUtils.format(" AND %s IN (", columnName));
for (int i = 0; i < values.size(); i++) {
sb.append(StringUtils.format(":%s%d", columnName, i));
if (i != values.size() - 1) {
sb.append(",");
}
}
sb.append(")");
return sb.toString();
}

private static void bindVersionsToQuery(final SQLStatement query, final List<String> versions)
/**
* Binds the provided list of {@code values} to the specified {@code columnName} in the given SQL {@code query} that
* contains an {@code IN} clause.
*
* @see #getParameterizedInConditionForColumn(String, List)
*/
private static void bindColumnValuesToQueryWithInCondition(
final String columnName,
final List<String> values,
final SQLStatement<?> query
)
{
if (CollectionUtils.isNullOrEmpty(versions)) {
if (CollectionUtils.isNullOrEmpty(values)) {
return;
}

for (int i = 0; i < versions.size(); i++) {
query.bind(StringUtils.format("version%d", i), versions.get(i));
for (int i = 0; i < values.size(); i++) {
query.bind(StringUtils.format("%s%d", columnName, i), values.get(i));
}
}

Expand Down

0 comments on commit a70e28a

Please sign in to comment.