-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add additional timestamp sources #298
base: main
Are you sure you want to change the base?
Conversation
commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
Outdated
Show resolved
Hide resolved
commons/src/main/java/io/aiven/kafka/connect/common/config/Path.java
Outdated
Show resolved
Hide resolved
ea0cd33
to
74049bb
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry it took so long to get back to this review.
commons/src/main/java/io/aiven/kafka/connect/common/config/TimestampSource.java
Outdated
Show resolved
Hide resolved
|
||
* @return this | ||
*/ | ||
public Builder configuration(final String configuration) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method does not make sens to me.
There should be a setType(Type type)
method and a setArgs(String... args)
method. Worry about configuration packing outside of this class. I know that you are thinking that you can put HEADER:field
into a configuration but do the unpacking outside of this class, then you don't have to jump through hoops to change the split pattern or similar gyrations.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
happy to separate the type and parameter, but its not varargs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think its probably better to allow all of the additional class parameters to be able to be specified via additional parameters - which become available with #300. That way its explicit, documented and extensible
this.zoneId = zoneId; | ||
this.type = type; | ||
this.dataExtractor = dataExtractor; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are no checks for any of these being null.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There doesn't need to be. The caller validates. Adding null checks at all levels just blurs the concerns IMO
public ZonedDateTime time(SinkRecord record) { | ||
return fromRawTime(dataExtractor.extractDataFrom(record)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you make this an abstract method then you can simply override it in your implementations and not pollute this class with methods that are only used by one instance. This will also make the code easier to read and maintain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This allows reuse. Its can still be overridden if a subclass needs it but doesn't require that. What is here is a default, available to be
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are two separate concerns here. The location of the date like field, and the conversion of that to a ZonedDateTime - e.g. 982b347 adds java.util.Date as a source conversion
This isn't polluting the api, its promoting reuse, and when extending this library to allow external code to use it, I believe its best to enable extension code to access to common and shared concepts, like implicit type conversion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not a class that just does the conversions. I think that classes should try to do one thing and do it very well. A class to convert time formats means they are reusable elsewhere in the chain.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
so you want o have the Extraction as one component in the TimstampSource?
So roughly an extractor and a convertor?
that not the way that the existing TimestampSources work
e.g.
for event
@Override
public ZonedDateTime time(final SinkRecord record) {
return ZonedDateTime.ofInstant(Instant.ofEpochMilli(record.timestamp()), zoneId);
}
does 3 things locate, convert and add zoneId
Do you want to change the split of the functionality for all of these existing cases? or have I missed what you are proposing/unhappy about
final String params = parts.length > 1 ? parts[1] : null; | ||
try { | ||
final Class<?> clazz = Class.forName(className); | ||
return (TimestampSource) clazz.getConstructor(String.class, ZoneId.class).newInstance(params, zoneId); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The constructor argument seems backwards. ZoneId is required for all instances so shouldn't that be first and then the params?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
by definition both are required parameters in the signature
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think that its probably better to move all of the configuration to separate properties, once #300 is merged. That allows for external classes to add to the configuration parameters, and read them. That way its not limited in the configuration, and self documenting
Thoughts?
commons/src/main/java/io/aiven/kafka/connect/common/config/extractors/HeaderValueExtractor.java
Show resolved
Hide resolved
from a header value from a data field via a custom extractors Remove a few simple classes and make a DataExtractor to read things from the `sinkRecord` and few tidyups
… (found with avro logical type schema annotation https://avro.apache.org/docs/1.8.2/spec.html#Logical+Types)
d59ca8e
to
9318097
Compare
from a header value
from a data field
via a custom extractors