Skip to content

Commit

Permalink
[CDAP-20872] Add data encryption for CredentialIdentityStore and Cred…
Browse files Browse the repository at this point in the history
…entialProfileStore
  • Loading branch information
dli357 committed Nov 13, 2023
1 parent eb88979 commit 838fb75
Show file tree
Hide file tree
Showing 8 changed files with 161 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.scheduler.Scheduler;
import io.cdap.cdap.securestore.spi.SecretStore;
import io.cdap.cdap.security.encryption.guice.AeadEncryptionModule;
import io.cdap.cdap.security.impersonation.DefaultOwnerAdmin;
import io.cdap.cdap.security.impersonation.DefaultUGIProvider;
import io.cdap.cdap.security.impersonation.OwnerAdmin;
Expand Down Expand Up @@ -200,6 +201,7 @@ public Module getInMemoryModules() {
new EntityVerifierModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
new AeadEncryptionModule(),
BootstrapModules.getInMemoryModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -243,6 +245,7 @@ public Module getStandaloneModules() {
new ProvisionerModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
new AeadEncryptionModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down Expand Up @@ -298,6 +301,7 @@ public Module getDistributedModules() {
new ProvisionerModule(),
new MasterCredentialProviderModule(),
new OperationModule(),
new AeadEncryptionModule(),
BootstrapModules.getFileBasedModule(),
new AbstractModule() {
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@
import io.cdap.cdap.internal.sysapp.SystemAppManagementService;
import io.cdap.cdap.proto.id.NamespaceId;
import io.cdap.cdap.scheduler.CoreSchedulerService;
import io.cdap.cdap.security.encryption.AeadCipherService;
import io.cdap.cdap.security.encryption.guice.AeadEncryptionModule;
import io.cdap.cdap.sourcecontrol.RepositoryCleanupService;
import io.cdap.cdap.sourcecontrol.operationrunner.SourceControlOperationRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
Expand Down Expand Up @@ -97,6 +99,8 @@ public class AppFabricServer extends AbstractIdleService {
private final SConfiguration sConf;
private final boolean sslEnabled;
private final TransactionRunner transactionRunner;
private final AeadCipherService userEncryptionAeadCipherService;
private final AeadCipherService dataStorageAeadCipherService;

private Cancellable cancelHttpService;
private Set<HttpHandler> handlers;
Expand Down Expand Up @@ -130,7 +134,12 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
CommonNettyHttpServiceFactory commonNettyHttpServiceFactory,
RunRecordTimeToLiveService runRecordTimeToLiveService,
SourceControlOperationRunner sourceControlOperationRunner,
RepositoryCleanupService repositoryCleanupService) {
RepositoryCleanupService repositoryCleanupService,
@Named(AeadEncryptionModule.USER_CREDENTIAL_ENCRYPTION)
AeadCipherService userCredentialAeadCipherService,
@Named(AeadEncryptionModule.DATA_STORAGE_ENCRYPTION)
AeadCipherService dataStorageAeadCipherService
) {
this.hostname = hostname;
this.discoveryService = discoveryService;
this.handlers = handlers;
Expand Down Expand Up @@ -158,6 +167,8 @@ public AppFabricServer(CConfiguration cConf, SConfiguration sConf,
this.commonNettyHttpServiceFactory = commonNettyHttpServiceFactory;
this.sourceControlOperationRunner = sourceControlOperationRunner;
this.repositoryCleanupService = repositoryCleanupService;
this.userEncryptionAeadCipherService = userCredentialAeadCipherService;
this.dataStorageAeadCipherService = dataStorageAeadCipherService;
}

/**
Expand All @@ -170,10 +181,6 @@ protected void startUp() throws Exception {
Constants.Logging.COMPONENT_NAME,
Constants.Service.APP_FABRIC_HTTP));
List<ListenableFuture<State>> futuresList = new ArrayList<>();
FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
futuresList.add(namespaceCredentialProviderService.start());
}
futuresList.addAll(ImmutableList.of(
provisioningService.start(),
applicationLifecycleService.start(),
Expand All @@ -188,9 +195,18 @@ protected void startUp() throws Exception {
runRecordCounterService.start(),
runRecordTimeToLiveService.start(),
sourceControlOperationRunner.start(),
repositoryCleanupService.start()
repositoryCleanupService.start(),
userEncryptionAeadCipherService.start(),
dataStorageAeadCipherService.start()
));
Futures.allAsList(futuresList).get();
// namespaceCredentialProviderService depends on dataStorageAeadCipherService, so we wait for
// all of them to start first.
FeatureFlagsProvider featureFlagsProvider = new DefaultFeatureFlagsProvider(cConf);
if (Feature.NAMESPACED_SERVICE_ACCOUNTS.isEnabled(featureFlagsProvider)) {
namespaceCredentialProviderService.startAndWait();
}


// Create handler hooks
List<MetricsReporterHook> handlerHooks = handlerHookNames.stream()
Expand Down Expand Up @@ -249,6 +265,8 @@ protected void shutDown() throws Exception {
repositoryCleanupService.stopAndWait();
credentialProviderService.stopAndWait();
namespaceCredentialProviderService.stopAndWait();
userEncryptionAeadCipherService.stopAndWait();
dataStorageAeadCipherService.stopAndWait();
}

private Cancellable startHttpService(NettyHttpService httpService) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.cdap.cdap.proto.credential.CredentialIdentity;
import io.cdap.cdap.proto.id.CredentialIdentityId;
import io.cdap.cdap.proto.id.CredentialProfileId;
import io.cdap.cdap.security.spi.encryption.CipherOperationException;
import io.cdap.cdap.spi.data.StructuredTableContext;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
import io.cdap.cdap.spi.data.transaction.TransactionRunners;
Expand Down Expand Up @@ -70,7 +71,11 @@ public Collection<CredentialIdentityId> list(String namespace) throws IOExceptio
*/
public Optional<CredentialIdentity> get(CredentialIdentityId id) throws IOException {
return TransactionRunners.run(transactionRunner, context -> {
return identityStore.get(context, id);
try {
return identityStore.get(context, id);
} catch (CipherOperationException e) {
throw new IOException("Failed to decrypt identity", e);
}
}, IOException.class);
}

Expand All @@ -86,7 +91,7 @@ public Optional<CredentialIdentity> get(CredentialIdentityId id) throws IOExcept
public void create(CredentialIdentityId id, CredentialIdentity identity)
throws AlreadyExistsException, IOException, NotFoundException {
TransactionRunners.run(transactionRunner, context -> {
if (identityStore.get(context, id).isPresent()) {
if (identityStore.exists(context, id)) {
throw new AlreadyExistsException(String.format("Credential identity '%s:%s' already exists",
id.getNamespace(), id.getName()));
}
Expand All @@ -106,7 +111,7 @@ public void create(CredentialIdentityId id, CredentialIdentity identity)
public void update(CredentialIdentityId id, CredentialIdentity identity)
throws IOException, NotFoundException {
TransactionRunners.run(transactionRunner, context -> {
if (!identityStore.get(context, id).isPresent()) {
if (!identityStore.exists(context, id)) {
throw new NotFoundException(String.format("Credential identity '%s:%s' not found",
id.getNamespace(), id.getName()));
}
Expand All @@ -123,7 +128,7 @@ public void update(CredentialIdentityId id, CredentialIdentity identity)
*/
public void delete(CredentialIdentityId id) throws IOException, NotFoundException {
TransactionRunners.run(transactionRunner, context -> {
if (!identityStore.get(context, id).isPresent()) {
if (!identityStore.exists(context, id)) {
throw new NotFoundException(String.format("Credential identity '%s:%s' not found",
id.getNamespace(), id.getName()));
}
Expand All @@ -136,10 +141,14 @@ private void validateAndWriteIdentity(StructuredTableContext context, Credential
// Validate the referenced profile exists.
CredentialProfileId profileId = new CredentialProfileId(identity.getProfileNamespace(),
identity.getProfileName());
if (!profileStore.get(context, profileId).isPresent()) {
if (!profileStore.exists(context, profileId)) {
throw new NotFoundException(String.format("Credential profile '%s:%s' not found",
profileId.getNamespace(), profileId.getName()));
}
identityStore.write(context, id, identity);
try {
identityStore.write(context, id, identity);
} catch (CipherOperationException e) {
throw new IOException("Failed to encrypt identity", e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import io.cdap.cdap.proto.credential.CredentialProfile;
import io.cdap.cdap.proto.id.CredentialIdentityId;
import io.cdap.cdap.proto.id.CredentialProfileId;
import io.cdap.cdap.security.spi.encryption.CipherOperationException;
import io.cdap.cdap.security.spi.credential.CredentialProvider;
import io.cdap.cdap.security.spi.credential.ProfileValidationException;
import io.cdap.cdap.spi.data.transaction.TransactionRunner;
Expand Down Expand Up @@ -80,7 +81,11 @@ public Collection<CredentialProfileId> list(String namespace) throws IOException
*/
public Optional<CredentialProfile> get(CredentialProfileId id) throws IOException {
return TransactionRunners.run(transactionRunner, context -> {
return profileStore.get(context, id);
try {
return profileStore.get(context, id);
} catch (CipherOperationException e) {
throw new IOException("Failed to decrypt profile", e);
}
}, IOException.class);
}

Expand All @@ -97,11 +102,15 @@ public void create(CredentialProfileId id, CredentialProfile profile)
throws AlreadyExistsException, BadRequestException, IOException {
validateProfile(profile);
TransactionRunners.run(transactionRunner, context -> {
if (profileStore.get(context, id).isPresent()) {
if (profileStore.exists(context, id)) {
throw new AlreadyExistsException(String.format("Credential profile '%s:%s' already exists",
id.getNamespace(), id.getName()));
}
profileStore.write(context, id, profile);
try {
profileStore.write(context, id, profile);
} catch (CipherOperationException e) {
throw new IOException("Failed to encrypt profile", e);
}
}, AlreadyExistsException.class, IOException.class);
}

Expand All @@ -118,11 +127,15 @@ public void update(CredentialProfileId id, CredentialProfile profile)
throws BadRequestException, IOException, NotFoundException {
validateProfile(profile);
TransactionRunners.run(transactionRunner, context -> {
if (!profileStore.get(context, id).isPresent()) {
if (!profileStore.exists(context, id)) {
throw new NotFoundException(String.format("Credential profile '%s:%s' not found",
id.getNamespace(), id.getName()));
}
profileStore.write(context, id, profile);
try {
profileStore.write(context, id, profile);
} catch (CipherOperationException e) {
throw new IOException("Failed to encrypt profile", e);
}
}, IOException.class, NotFoundException.class);
}

Expand All @@ -137,7 +150,7 @@ public void update(CredentialProfileId id, CredentialProfile profile)
public void delete(CredentialProfileId id)
throws ConflictException, IOException, NotFoundException {
TransactionRunners.run(transactionRunner, context -> {
if (!profileStore.get(context, id).isPresent()) {
if (!profileStore.exists(context, id)) {
throw new NotFoundException(String.format("Credential profile '%s:%s' not found",
id.getNamespace(), id.getName()));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
import io.cdap.cdap.proto.credential.CredentialIdentity;
import io.cdap.cdap.proto.id.CredentialIdentityId;
import io.cdap.cdap.proto.id.CredentialProfileId;
import io.cdap.cdap.security.encryption.AeadCipherService;
import io.cdap.cdap.security.encryption.guice.AeadEncryptionModule;
import io.cdap.cdap.security.spi.encryption.CipherOperationException;
import io.cdap.cdap.spi.data.StructuredRow;
import io.cdap.cdap.spi.data.StructuredTable;
import io.cdap.cdap.spi.data.StructuredTableContext;
Expand All @@ -29,6 +32,7 @@
import io.cdap.cdap.spi.data.table.field.Range;
import io.cdap.cdap.store.StoreDefinition.CredentialProviderStore;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
Expand All @@ -38,14 +42,26 @@
import java.util.Spliterators;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import javax.inject.Inject;
import javax.inject.Named;

/**
* Storage for credential identities.
*/
public class CredentialIdentityStore {

private static final byte[] CREDENTIAL_IDENTITY_STORE_AD = CredentialIdentityStore.class
.getCanonicalName().getBytes(StandardCharsets.UTF_8);
private static final Gson GSON = new Gson();

private final AeadCipherService dataStorageCipher;

@Inject
public CredentialIdentityStore(@Named(AeadEncryptionModule.DATA_STORAGE_ENCRYPTION)
AeadCipherService dataStorageCipher) {
this.dataStorageCipher = dataStorageCipher;
}

/**
* Lists entries in the credential identity table for a given namespace.
*
Expand Down Expand Up @@ -84,6 +100,24 @@ public Collection<CredentialIdentityId> listForProfile(StructuredTableContext co
}
}

/**
* Returns whether an entry exists in the identity table.
*
* @param context The transaction context to use.
* @param id The identity reference to fetch.
* @return Whether the credential identity exists.
*/
public boolean exists(StructuredTableContext context, CredentialIdentityId id)
throws IOException {
StructuredTable table = context.getTable(CredentialProviderStore.CREDENTIAL_IDENTITIES);
Collection<Field<?>> key = Arrays.asList(
Fields.stringField(CredentialProviderStore.NAMESPACE_FIELD,
id.getNamespace()),
Fields.stringField(CredentialProviderStore.IDENTITY_NAME_FIELD,
id.getName()));
return table.read(key).isPresent();
}

/**
* Fetch an entry from the identity table.
*
Expand All @@ -93,15 +127,21 @@ public Collection<CredentialIdentityId> listForProfile(StructuredTableContext co
* @throws IOException If any failure reading from storage occurs.
*/
public Optional<CredentialIdentity> get(StructuredTableContext context, CredentialIdentityId id)
throws IOException {
throws CipherOperationException, IOException {
StructuredTable table = context.getTable(CredentialProviderStore.CREDENTIAL_IDENTITIES);
Collection<Field<?>> key = Arrays.asList(
Fields.stringField(CredentialProviderStore.NAMESPACE_FIELD,
id.getNamespace()),
Fields.stringField(CredentialProviderStore.IDENTITY_NAME_FIELD,
id.getName()));
return table.read(key).map(row -> GSON.fromJson(row
.getString(CredentialProviderStore.IDENTITY_DATA_FIELD), CredentialIdentity.class));
Optional<StructuredRow> row = table.read(key);
if (!row.isPresent()) {
return Optional.empty();
}
String decryptedData = new String(
dataStorageCipher.decrypt(row.get().getBytes(CredentialProviderStore.IDENTITY_DATA_FIELD),
CREDENTIAL_IDENTITY_STORE_AD), StandardCharsets.UTF_8);
return Optional.of(GSON.fromJson(decryptedData, CredentialIdentity.class));
}

/**
Expand All @@ -113,16 +153,17 @@ public Optional<CredentialIdentity> get(StructuredTableContext context, Credenti
* @throws IOException If any failure reading from storage occurs.
*/
public void write(StructuredTableContext context, CredentialIdentityId id,
CredentialIdentity identity) throws IOException {
CredentialIdentity identity) throws CipherOperationException, IOException {
StructuredTable identityTable =
context.getTable(CredentialProviderStore.CREDENTIAL_IDENTITIES);
Collection<Field<?>> row = Arrays.asList(
Fields.stringField(CredentialProviderStore.NAMESPACE_FIELD,
id.getNamespace()),
Fields.stringField(CredentialProviderStore.IDENTITY_NAME_FIELD,
id.getName()),
Fields.stringField(CredentialProviderStore.IDENTITY_DATA_FIELD,
GSON.toJson(identity)),
Fields.bytesField(CredentialProviderStore.IDENTITY_DATA_FIELD,
dataStorageCipher.encrypt(GSON.toJson(identity).getBytes(StandardCharsets.UTF_8),
CREDENTIAL_IDENTITY_STORE_AD)),
Fields.stringField(CredentialProviderStore.IDENTITY_PROFILE_INDEX_FIELD,
toProfileIndex(identity.getProfileNamespace(), identity.getProfileName())));
identityTable.upsert(row);
Expand Down
Loading

0 comments on commit 838fb75

Please sign in to comment.