Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Create plugins for opportunity & opportunity_role #25

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
import org.embulk.input.marketo.delegate.LeadWithListInputPlugin;
import org.embulk.input.marketo.delegate.LeadWithProgramInputPlugin;
import org.embulk.input.marketo.delegate.ListInputPlugin;
import org.embulk.input.marketo.delegate.OpportunityInputPlugin;
import org.embulk.input.marketo.delegate.OpportunityRoleInputPlugin;
import org.embulk.input.marketo.delegate.ProgramInputPlugin;
import org.embulk.input.marketo.delegate.ProgramMembersBulkExtractInputPlugin;
import org.embulk.input.marketo.rest.MarketoRestClient;
Expand All @@ -39,7 +41,9 @@ public interface PluginTask
ProgramMembersBulkExtractInputPlugin.PluginTask,
ListInputPlugin.PluginTask,
ActivityTypeInputPlugin.PluginTask,
FolderInputPlugin.PluginTask
FolderInputPlugin.PluginTask,
OpportunityInputPlugin.PluginTask,
OpportunityRoleInputPlugin.PluginTask
{
@Config("target")
Target getTarget();
Expand Down Expand Up @@ -88,7 +92,9 @@ public enum Target
PROGRAM_MEMBERS(new ProgramMembersBulkExtractInputPlugin()),
LIST(new ListInputPlugin()),
ACTIVITY_TYPE(new ActivityTypeInputPlugin()),
FOLDER(new FolderInputPlugin());
FOLDER(new FolderInputPlugin()),
OPPORTUNITY(new OpportunityInputPlugin()),
OPPORTUNITY_ROLE(new OpportunityRoleInputPlugin());

private final RestClientInputPluginDelegate restClientInputPluginDelegate;

Expand Down
5 changes: 5 additions & 0 deletions src/main/java/org/embulk/input/marketo/MarketoService.java
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,9 @@ public interface MarketoService
File extractProgramMembers(String exportID);

Iterable<ObjectNode> getFolders(Optional<Long> rootId, RootType rootType, Integer maxDepth, Optional<String> workspace);

Iterable<ObjectNode> getOpportunities(String filterType, Set<String> filterValues);

Iterable<ObjectNode> getOpportunityRoles(String filterType, Set<String> filterValues);
}

23 changes: 23 additions & 0 deletions src/main/java/org/embulk/input/marketo/MarketoServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -286,4 +286,27 @@ public Iterable<ObjectNode> getFolders(Optional<Long> rootId, RootType rootType,
Optional<String> root = rootId.isPresent() ? Optional.of(String.format("{\"id\": %d, \"type\": \"%s\"}", rootId.get(), type)) : Optional.empty();
return marketoRestClient.getFolders(root, maxDepth, workspace);
}

@Override
public Iterable<ObjectNode> getOpportunities(String filterType, Set<String> filterValues)
{
// Marketo allows maximum 300 (comma separated) filterValues per request. Split the input and process by chunk.
List<List<String>> partitionedFilterValues = Lists.partition(Lists.newArrayList(filterValues), 300);
return MarketoUtils.flatMap(partitionedFilterValues, (part) -> {
String strFilterValues = StringUtils.join(part, ",");
return marketoRestClient.getOpportunities(filterType, strFilterValues);
});
}

@Override
public Iterable<ObjectNode> getOpportunityRoles(String filterType, Set<String> filterValues)
{
// Marketo allows maximum 300 (comma separated) filterValues per request. Split the input and process by chunk.
List<List<String>> partitionedFilterValues = Lists.partition(Lists.newArrayList(filterValues), 300);
return MarketoUtils.flatMap(partitionedFilterValues, (part) -> {
String strFilterValues = StringUtils.join(part, ",");
return marketoRestClient.getOpportunityRoles(filterType, strFilterValues);
});
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package org.embulk.input.marketo.delegate;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.FluentIterable;
import org.apache.commons.lang3.StringUtils;
import org.embulk.base.restclient.ServiceResponseMapper;
import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper;
import org.embulk.base.restclient.record.ServiceRecord;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.config.ConfigException;
import org.embulk.input.marketo.MarketoService;
import org.embulk.input.marketo.MarketoUtils;
import org.embulk.spi.type.Types;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;

import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class OpportunityInputPlugin extends MarketoBaseInputPluginDelegate<OpportunityInputPlugin.PluginTask>
{
public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask
{
@Config("opportunity_filter_type")
@ConfigDefault("\"\"")
String getOpportunityFilterType();

@Config("opportunity_filter_values")
@ConfigDefault("null")
Optional<String> getOpportunityFilterValues();
}

public OpportunityInputPlugin()
{
}

@Override
public void validateInputTask(PluginTask task)
{
super.validateInputTask(task);
if (StringUtils.isBlank(task.getOpportunityFilterType())) {
throw new ConfigException("`opportunity_filter_type` cannot be empty");
}
if (refineFilterValues(task.getOpportunityFilterValues().orElse("")).isEmpty()) {
throw new ConfigException("`opportunity_filter_values` cannot contain empty values only");
}
}

private Set<String> refineFilterValues(String filterValues)
{
return Stream.of(StringUtils.split(filterValues, ",")).map(StringUtils::trimToEmpty).filter(StringUtils::isNotBlank).collect(Collectors.toSet());
}

@Override
protected Iterator<ServiceRecord> getServiceRecords(MarketoService marketoService, PluginTask task)
{
Set<String> refinedValues = refineFilterValues(task.getOpportunityFilterValues().get());
Iterable<ObjectNode> responseObj = marketoService.getOpportunities(task.getOpportunityFilterType(), refinedValues);
return FluentIterable.from(responseObj).transform(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION).iterator();
}

@Override
public ServiceResponseMapper<? extends ValueLocator> buildServiceResponseMapper(PluginTask task)
{
JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder();
builder.add("marketoGUID", Types.STRING)
.add("createdAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT)
.add("updatedAt", Types.TIMESTAMP, MarketoUtils.MARKETO_DATE_TIME_FORMAT)
.add("externalOpportunityId", Types.STRING);
return builder.build();
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package org.embulk.input.marketo.delegate;

import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.FluentIterable;
import org.apache.commons.lang3.StringUtils;
import org.embulk.base.restclient.ServiceResponseMapper;
import org.embulk.base.restclient.jackson.JacksonServiceResponseMapper;
import org.embulk.base.restclient.record.ServiceRecord;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.config.ConfigException;
import org.embulk.input.marketo.MarketoService;
import org.embulk.input.marketo.MarketoUtils;
import org.embulk.spi.type.Types;
import org.embulk.util.config.Config;
import org.embulk.util.config.ConfigDefault;

import java.util.Iterator;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;

public class OpportunityRoleInputPlugin extends MarketoBaseInputPluginDelegate<OpportunityRoleInputPlugin.PluginTask>
{
public interface PluginTask extends MarketoBaseInputPluginDelegate.PluginTask
{
@Config("opportunity_role_filter_type")
@ConfigDefault("\"\"")
String getOpportunityRoleFilterType();

@Config("opportunity_role_filter_values")
@ConfigDefault("null")
Optional<String> getOpportunityRoleFilterValues();
}

public OpportunityRoleInputPlugin()
{
}

@Override
public void validateInputTask(PluginTask task)
{
super.validateInputTask(task);
if (StringUtils.isBlank(task.getOpportunityRoleFilterType())) {
throw new ConfigException("`opportunity_role_filter_type` cannot be empty");
}
if (refineFilterValues(task.getOpportunityRoleFilterValues().orElse("")).isEmpty()) {
throw new ConfigException("`opportunity_role_filter_values` cannot contain empty values only");
}
}

private Set<String> refineFilterValues(String filterValues)
{
return Stream.of(StringUtils.split(filterValues, ",")).map(StringUtils::trimToEmpty).filter(StringUtils::isNotBlank).collect(Collectors.toSet());
}

@Override
protected Iterator<ServiceRecord> getServiceRecords(MarketoService marketoService, PluginTask task)
{
Set<String> refinedValues = refineFilterValues(task.getOpportunityRoleFilterValues().get());
Iterable<ObjectNode> responseObj = marketoService.getOpportunityRoles(task.getOpportunityRoleFilterType(), refinedValues);
return FluentIterable.from(responseObj).transform(MarketoUtils.TRANSFORM_OBJECT_TO_JACKSON_SERVICE_RECORD_FUNCTION).iterator();
}

@Override
public ServiceResponseMapper<? extends ValueLocator> buildServiceResponseMapper(PluginTask task)
{
JacksonServiceResponseMapper.Builder builder = JacksonServiceResponseMapper.builder();
builder.add("marketoGUID", Types.STRING)
.add("externalOpportunityId", Types.STRING)
.add("leadId", Types.LONG)
.add("role", Types.STRING)
.add("isPrimary", Types.BOOLEAN);
return builder.build();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ public enum MarketoRESTEndpoint
START_PROGRAM_MEMBERS_EXPORT_JOB("/bulk/v1/program/members/export/${export_id}/enqueue.json"),
GET_PROGRAM_MEMBERS_EXPORT_STATUS("/bulk/v1/program/members/export/${export_id}/status.json"),
GET_PROGRAM_MEMBERS_EXPORT_RESULT("/bulk/v1/program/members/export/${export_id}/file.json"),
GET_FOLDERS("/rest/asset/v1/folders.json");
GET_FOLDERS("/rest/asset/v1/folders.json"),
GET_OPPORTUNITIES("/rest/v1/opportunities.json"),
GET_OPPORTUNITY_ROLES("/rest/v1/opportunities/roles.json");
private final String endpoint;

MarketoRESTEndpoint(String endpoint)
Expand Down
20 changes: 20 additions & 0 deletions src/main/java/org/embulk/input/marketo/rest/MarketoRestClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -643,4 +643,24 @@ public RecordPagingIterable<ObjectNode> getFolders(Optional<String> root, int ma
}
return getRecordWithOffsetPagination(endPoint + MarketoRESTEndpoint.GET_FOLDERS.getEndpoint(), builder.build(), ObjectNode.class);
}

public RecordPagingIterable<ObjectNode> getOpportunities(String filterType, String filterValue)
{
Multimap<String, String> params = new ImmutableListMultimap
.Builder<String, String>()
.put("filterType", StringUtils.trimToEmpty(filterType))
.put("filterValues", StringUtils.trimToEmpty(filterValue))
.put(BATCH_SIZE, MAX_BATCH_SIZE).build();
return getRecordWithTokenPagination(endPoint + MarketoRESTEndpoint.GET_OPPORTUNITIES.getEndpoint(), params, ObjectNode.class);
}

public RecordPagingIterable<ObjectNode> getOpportunityRoles(String filterType, String filterValue)
{
Multimap<String, String> params = new ImmutableListMultimap
.Builder<String, String>()
.put("filterType", StringUtils.trimToEmpty(filterType))
.put("filterValues", StringUtils.trimToEmpty(filterValue))
.put(BATCH_SIZE, MAX_BATCH_SIZE).build();
return getRecordWithTokenPagination(endPoint + MarketoRESTEndpoint.GET_OPPORTUNITY_ROLES.getEndpoint(), params, ObjectNode.class);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package org.embulk.input.marketo.delegate;

import com.fasterxml.jackson.databind.JavaType;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.embulk.EmbulkTestRuntime;
import org.embulk.base.restclient.ServiceResponseMapper;
import org.embulk.base.restclient.record.RecordImporter;
import org.embulk.base.restclient.record.ValueLocator;
import org.embulk.config.ConfigException;
import org.embulk.config.ConfigLoader;
import org.embulk.config.ConfigSource;
import org.embulk.input.marketo.rest.MarketoRestClient;
import org.embulk.input.marketo.rest.RecordPagingIterable;
import org.embulk.spi.PageBuilder;
import org.embulk.spi.Schema;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.mockito.ArgumentCaptor;

import java.io.IOException;
import java.util.List;

import static org.embulk.input.marketo.MarketoUtilsTest.CONFIG_MAPPER;
import static org.embulk.input.marketo.delegate.OpportunityInputPlugin.PluginTask;
import static org.junit.Assert.assertArrayEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.anyString;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class OpportunityInputPluginTest
{
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

@Rule
public EmbulkTestRuntime embulkTestRuntime = new EmbulkTestRuntime();

private OpportunityInputPlugin opportunityInputPlugin;

private ConfigSource configSource;

private MarketoRestClient mockMarketoRestClient;

@Before
public void setUp() throws Exception
{
opportunityInputPlugin = spy(new OpportunityInputPlugin());
ConfigLoader configLoader = embulkTestRuntime.getInjector().getInstance(ConfigLoader.class);
configSource = configLoader.fromYaml(this.getClass().getResourceAsStream("/config/opportunity.yaml"));
mockMarketoRestClient = mock(MarketoRestClient.class);
doReturn(mockMarketoRestClient).when(opportunityInputPlugin).createMarketoRestClient(any(PluginTask.class));
}

@Test
public void testValidPluginTask()
{
PluginTask pluginTask = mapTask(configSource);
opportunityInputPlugin.validateInputTask(pluginTask);
}

@Test
public void testOpportunityFilterTypeError()
{
PluginTask pluginTask = mapTask(configSource.set("opportunity_filter_type", ""));
Assert.assertThrows(ConfigException.class, () -> opportunityInputPlugin.validateInputTask(pluginTask));
}

@Test
public void testEmptyStringFilterValues()
{
PluginTask pluginTask = mapTask(configSource.set("opportunity_filter_values", ""));
Assert.assertThrows(ConfigException.class, () -> opportunityInputPlugin.validateInputTask(pluginTask));
}

@Test
public void testAllEmptyStringFilterValues()
{
PluginTask pluginTask = mapTask(configSource.set("opportunity_filter_values", ",, , "));
Assert.assertThrows(ConfigException.class, () -> opportunityInputPlugin.validateInputTask(pluginTask));
}

@Test
public void testRun() throws IOException
{
RecordPagingIterable<ObjectNode> mockRecordPagingIterable = mock(RecordPagingIterable.class);
JavaType javaType = OBJECT_MAPPER.getTypeFactory().constructParametrizedType(List.class, List.class, ObjectNode.class);
List<ObjectNode> objectNodeList = OBJECT_MAPPER.readValue(this.getClass().getResourceAsStream("/fixtures/opportunity_response.json"), javaType);
when(mockRecordPagingIterable.iterator()).thenReturn(objectNodeList.iterator());
when(mockMarketoRestClient.getOpportunities(anyString(), anyString())).thenReturn(mockRecordPagingIterable);

PluginTask task = mapTask(configSource);
ServiceResponseMapper<? extends ValueLocator> mapper = opportunityInputPlugin.buildServiceResponseMapper(task);
RecordImporter recordImporter = mapper.createRecordImporter();
PageBuilder mockPageBuilder = mock(PageBuilder.class);
opportunityInputPlugin.ingestServiceData(task, recordImporter, 1, mockPageBuilder);
verify(mockMarketoRestClient, times(1)).getOpportunities(anyString(), anyString());

Schema schema = mapper.getEmbulkSchema();
ArgumentCaptor<String> stringArgumentCaptor = ArgumentCaptor.forClass(String.class);
verify(mockPageBuilder, times(3)).setString(eq(schema.lookupColumn("externalOpportunityId")), stringArgumentCaptor.capture());
List<String> allValues = stringArgumentCaptor.getAllValues();
assertArrayEquals(new String[]{"externalOpportunityId_1", "externalOpportunityId_2", "externalOpportunityId_3"}, allValues.toArray());
}

private PluginTask mapTask(ConfigSource config)
{
return CONFIG_MAPPER.map(config, PluginTask.class);
}
}

Loading
Loading