Skip to content

Commit

Permalink
EXPB-2299 Speed up schema lookup for huge schemas
Browse files Browse the repository at this point in the history
  • Loading branch information
kramerul committed Jul 17, 2024
1 parent c2b5b7e commit 7aad3e0
Show file tree
Hide file tree
Showing 39 changed files with 518 additions and 365 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.runtime.Hook;
import org.apache.calcite.schema.LikePattern;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Table;
import org.apache.calcite.schema.impl.AbstractSchema;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import org.apache.calcite.rel.RelCollation;
import org.apache.calcite.rel.RelCollations;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.LikePattern;
import org.apache.calcite.schema.Lookup;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.schema.QueryableTable;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,28 +16,34 @@
*/
package org.apache.calcite.adapter.jdbc;

import com.google.common.collect.ImmutableSet;

import org.apache.calcite.DataContext;
import org.apache.calcite.linq4j.tree.Expression;
import org.apache.calcite.linq4j.tree.Expressions;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.schema.Schemas;
import org.apache.calcite.schema.Wrapper;
import org.apache.calcite.schema.impl.AbstractSchema;
import org.apache.calcite.schema.lookup.CachingLookup;
import org.apache.calcite.schema.lookup.IgnoreCaseLookup;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
import org.apache.calcite.util.BuiltInMethod;

import com.google.common.base.Suppliers;
import com.google.common.collect.ImmutableMap;

import org.checkerframework.checker.nullness.qual.Nullable;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import javax.sql.DataSource;

Expand All @@ -60,11 +66,12 @@ public class JdbcCatalogSchema extends AbstractSchema implements Wrapper {
public final SqlDialect dialect;
final JdbcConvention convention;
final String catalog;
private final Lookup<JdbcSchema> subSchemas;

/** Sub-schemas by name, lazily initialized. */
/** default schema name, lazily initialized. */
@SuppressWarnings({"method.invocation.invalid", "Convert2MethodRef"})
final Supplier<SubSchemaMap> subSchemaMapSupplier =
Suppliers.memoize(() -> computeSubSchemaMap());
private final Supplier<String> defaultSchemaName =
Suppliers.memoize(() -> computeDefaultSchemaName());

/** Creates a JdbcCatalogSchema. */
public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect,
Expand All @@ -73,6 +80,41 @@ public JdbcCatalogSchema(DataSource dataSource, SqlDialect dialect,
this.dialect = requireNonNull(dialect, "dialect");
this.convention = requireNonNull(convention, "convention");
this.catalog = catalog;
this.subSchemas = new CachingLookup<>(new IgnoreCaseLookup<JdbcSchema>() {
@Override
public @Nullable JdbcSchema get(String name) {
try (Connection connection = dataSource.getConnection();
ResultSet resultSet =
connection.getMetaData().getSchemas(catalog, name)) {
while (resultSet.next()) {
final String schemaName =
requireNonNull(resultSet.getString(1),
"got null schemaName from the database");
return new JdbcSchema(dataSource, dialect, convention, catalog, schemaName);
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return null;
}

@Override
public Set<String> getNames(LikePattern pattern) {
final ImmutableSet.Builder<String> builder =
ImmutableSet.builder();
try (Connection connection = dataSource.getConnection();
ResultSet resultSet =
connection.getMetaData().getSchemas(catalog, pattern.pattern)) {
while (resultSet.next()) {
builder.add(requireNonNull(resultSet.getString(1),
"got null schemaName from the database"));
}
} catch (SQLException e) {
throw new RuntimeException(e);
}
return builder.build();
}
});
}

public static JdbcCatalogSchema create(
Expand Down Expand Up @@ -103,34 +145,25 @@ public static JdbcCatalogSchema create(
return new JdbcCatalogSchema(dataSource, dialect, convention, catalog);
}

private SubSchemaMap computeSubSchemaMap() {
final ImmutableMap.Builder<String, Schema> builder =
ImmutableMap.builder();
String defaultSchemaName;
try (Connection connection = dataSource.getConnection();
ResultSet resultSet =
connection.getMetaData().getSchemas(catalog, null)) {
defaultSchemaName = connection.getSchema();
while (resultSet.next()) {
final String schemaName =
requireNonNull(resultSet.getString(1),
"got null schemaName from the database");
builder.put(schemaName,
new JdbcSchema(dataSource, dialect, convention, catalog, schemaName));
}
@Override public Lookup<? extends Schema> subSchemas() {
return subSchemas;
}

private String computeDefaultSchemaName() {
try (Connection connection = dataSource.getConnection()) {
return connection.getSchema();
} catch (SQLException e) {
throw new RuntimeException(e);
}
return new SubSchemaMap(defaultSchemaName, builder.build());
}

@Override protected Map<String, Schema> getSubSchemaMap() {
return subSchemaMapSupplier.get().map;
throw new UnsupportedOperationException("getSubSchemaMap");
}

/** Returns the name of the default sub-schema. */
public String getDefaultSubSchemaName() {
return subSchemaMapSupplier.get().defaultSchemaName;
return defaultSchemaName.get();
}

/** Returns the data source. */
Expand All @@ -148,16 +181,4 @@ public DataSource getDataSource() {
}
return null;
}

/** Contains sub-schemas by name, and the name of the default schema. */
private static class SubSchemaMap {
final String defaultSchemaName;
final ImmutableMap<String, Schema> map;

private SubSchemaMap(String defaultSchemaName,
ImmutableMap<String, Schema> map) {
this.defaultSchemaName = defaultSchemaName;
this.map = map;
}
}
}
31 changes: 14 additions & 17 deletions core/src/main/java/org/apache/calcite/adapter/jdbc/JdbcSchema.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,6 @@
*/
package org.apache.calcite.adapter.jdbc;

import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;

import com.google.common.util.concurrent.UncheckedExecutionException;

import org.apache.calcite.avatica.AvaticaUtils;
import org.apache.calcite.avatica.MetaImpl;
import org.apache.calcite.avatica.SqlType;
Expand All @@ -33,8 +27,10 @@
import org.apache.calcite.rel.type.RelDataTypeSystem;
import org.apache.calcite.rel.type.RelProtoDataType;
import org.apache.calcite.schema.*;
import org.apache.calcite.schema.impl.CachingLookup;
import org.apache.calcite.schema.impl.IgnoreCaseLookup;
import org.apache.calcite.schema.lookup.CachingLookup;
import org.apache.calcite.schema.lookup.IgnoreCaseLookup;
import org.apache.calcite.schema.lookup.LikePattern;
import org.apache.calcite.schema.lookup.Lookup;
import org.apache.calcite.sql.SqlDialect;
import org.apache.calcite.sql.SqlDialectFactory;
import org.apache.calcite.sql.SqlDialectFactoryImpl;
Expand All @@ -46,7 +42,6 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableMultimap;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Multimap;
import com.google.common.collect.Ordering;

Expand All @@ -64,12 +59,9 @@
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.Spliterator;
import java.util.Spliterators;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -111,6 +103,7 @@ public Set<String> getNames(LikePattern pattern) {
}
}
});
private final Lookup<JdbcSchema> subSchemas = Lookup.empty();

@Experimental
public static final ThreadLocal<@Nullable Foo> THREAD_METADATA = new ThreadLocal<>();
Expand Down Expand Up @@ -238,6 +231,11 @@ public static DataSource dataSource(String url, @Nullable String driverClassName
return tables;
}

@Override public Lookup<? extends Schema> subSchemas() {
return subSchemas;
}


@Override public boolean isMutable() {
return false;
}
Expand Down Expand Up @@ -534,13 +532,12 @@ protected Map<String, RelProtoDataType> getTypes() {
return (Set<String>) getTypes().keySet();
}

@Override public @Nullable Schema getSubSchema(String name) {
// JDBC does not support sub-schemas.
return null;
@Deprecated @Override public @Nullable Schema getSubSchema(String name) {
return subSchemas.get(name);
}

@Override public Set<String> getSubSchemaNames() {
return ImmutableSet.of();
@Deprecated @Override public Set<String> getSubSchemaNames() {
return subSchemas.getNames(LikePattern.any());
}

@Override public <T extends Object> @Nullable T unwrap(Class<T> clazz) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ private static TableScanNode createQueryable(Compiler compiler,
requireNonNull(schema, () ->
"schema is null while resolving " + name + " for table"
+ relOptTable.getQualifiedName());
schema = schema.getSubSchema(name);
schema = schema.subSchemas().get(name);
}
final Enumerable<Row> rowEnumerable;
if (elementType instanceof Class) {
Expand Down
Loading

0 comments on commit 7aad3e0

Please sign in to comment.