Skip to content

Commit

Permalink
remove some deprecated methods, improve scala style, move Java relate… (
Browse files Browse the repository at this point in the history
#156)

* remove some deprecated methods, improve scala style, move Java related classes to flinkx package

* update release process documentation as per new sbt project structure
  • Loading branch information
novakov-alexey authored Oct 6, 2024
1 parent 486aa60 commit 32867e4
Show file tree
Hide file tree
Showing 22 changed files with 108 additions and 224 deletions.
12 changes: 8 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -353,13 +353,17 @@ P.S. this flag can be deprecated in future when most of the users migrate to the

## Release

Define two environment variables before starting SBT shell:
Create SBT file at ~/.sbt/1.0/sonatype.sbt with the following content:

```bash
export SONATYPE_USERNAME=<your user name for Sonatype>
export SONATYPE_PASSWORD=<your password for Sonatype>
credentials += Credentials("Sonatype Nexus Repository Manager",
"s01.oss.sonatype.org",
"<access token: user name>",
"<access token: password>")
```

replace values with your access token user name and password.

Release new version:

```bash
Expand All @@ -369,7 +373,7 @@ sh release.sh
Increment to next SNAPSHOT version and push to Git server:

```bash
RELEASE_PUBLISH=true sbt 'release with-defaults'
RELEASE_PUBLISH=true sbt "; project scala-api; release with-defaults"
```

## License
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.apache.flink.util.InstantiationUtil;

import java.io.IOException;
import java.util.Objects;
import java.util.Optional;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@ import magnolia1.{CaseClass, Magnolia, SealedTrait}
import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.typeinfo.TypeInformation

import scala.annotation.tailrec
import scala.collection.mutable
import scala.language.experimental.macros
import scala.reflect._
import scala.reflect.runtime.universe.{Try => _, _}
import scala.util.{Failure, Success, Try}

private[api] trait LowPrioImplicits {
type Typeclass[T] = TypeInformation[T]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import scala.reflect.runtime.{currentMirror => cm}
import scala.reflect.runtime.universe
import scala.reflect.runtime.universe._

import org.apache.flinkx.api.serializers.drop

private[serializer] trait ConstructorCompat {

@nowarn("msg=(eliminated by erasure)|(explicit array)")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {
def minBy(field: String): DataStream[T] = aggregate(AggregationType.MINBY, field)

private def aggregate(aggregationType: AggregationType, field: String): DataStream[T] = {
val position = fieldNames2Indices(getInputType(), Array(field))(0)
val position = fieldNames2Indices(getInputType, Array(field))(0)
aggregate(aggregationType, position)
}

Expand Down Expand Up @@ -522,5 +522,5 @@ class AllWindowedStream[T, W <: Window](javaStream: JavaAllWStream[T, W]) {

/** Gets the output type.
*/
private def getInputType(): TypeInformation[T] = javaStream.getInputType
private def getInputType: TypeInformation[T] = javaStream.getInputType
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,14 @@ import org.apache.commons.io.IOUtils
import java.lang.invoke.{MethodHandleInfo, SerializedLambda}
import java.lang.reflect.{Field, Modifier}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream}
import scala.collection.mutable.{Map, Set, Stack}
import org.apache.commons.lang3.{ClassUtils, JavaVersion, SystemUtils}
import org.apache.commons.lang3.ClassUtils
import org.apache.flink.shaded.asm9.org.objectweb.asm.{ClassReader, ClassVisitor, Handle, MethodVisitor, Type}
import org.apache.flink.shaded.asm9.org.objectweb.asm.Opcodes._
import org.apache.flink.shaded.asm9.org.objectweb.asm.tree.{ClassNode, MethodNode}
import org.apache.flink.util.{FlinkException, InstantiationUtil}
import org.apache.flink.util.FlinkException
import org.slf4j.LoggerFactory

import scala.collection.mutable
import scala.jdk.CollectionConverters._

/** A cleaner that renders closures serializable if they can be done so safely.
Expand Down Expand Up @@ -82,12 +82,12 @@ object ClosureCleaner {
/** Return a list of classes that represent closures enclosed in the given closure object.
*/
private def getInnerClosureClasses(obj: AnyRef): List[Class[_]] = {
val seen = Set[Class[_]](obj.getClass)
val stack = Stack[Class[_]](obj.getClass)
while (!stack.isEmpty) {
val seen = mutable.Set[Class[_]](obj.getClass)
val stack = mutable.Stack[Class[_]](obj.getClass)
while (stack.nonEmpty) {
val cr = getClassReader(stack.pop())
if (cr != null) {
val set = Set.empty[Class[_]]
val set = mutable.Set.empty[Class[_]]
cr.accept(new InnerClosureFinder(set), 0)
for (cls <- set -- seen) {
seen += cls
Expand All @@ -99,14 +99,14 @@ object ClosureCleaner {
}

/** Initializes the accessed fields for outer classes and their super classes. */
private def initAccessedFields(accessedFields: Map[Class[_], Set[String]], outerClasses: Seq[Class[_]]): Unit = {
private def initAccessedFields(accessedFields: mutable.Map[Class[_], mutable.Set[String]], outerClasses: Seq[Class[_]]): Unit = {
for (cls <- outerClasses) {
var currentClass = cls
assert(currentClass != null, "The outer class can't be null.")

while (currentClass != null) {
accessedFields(currentClass) = Set.empty[String]
currentClass = currentClass.getSuperclass()
accessedFields(currentClass) = mutable.Set.empty[String]
currentClass = currentClass.getSuperclass
}
}
}
Expand All @@ -116,7 +116,7 @@ object ClosureCleaner {
outerClass: Class[_],
clone: AnyRef,
obj: AnyRef,
accessedFields: Map[Class[_], Set[String]]
accessedFields: mutable.Map[Class[_], mutable.Set[String]]
): Unit = {
for (fieldName <- accessedFields(outerClass)) {
val field = outerClass.getDeclaredField(fieldName)
Expand All @@ -131,7 +131,7 @@ object ClosureCleaner {
parent: AnyRef,
obj: AnyRef,
outerClass: Class[_],
accessedFields: Map[Class[_], Set[String]]
accessedFields: mutable.Map[Class[_], mutable.Set[String]]
): AnyRef = {
val clone = instantiateClass(outerClass, parent)

Expand All @@ -140,7 +140,7 @@ object ClosureCleaner {

while (currentClass != null) {
setAccessedFields(currentClass, clone, obj, accessedFields)
currentClass = currentClass.getSuperclass()
currentClass = currentClass.getSuperclass
}

clone
Expand All @@ -159,7 +159,7 @@ object ClosureCleaner {
* whether to clean enclosing closures transitively
*/
def clean(closure: AnyRef, checkSerializable: Boolean = true, cleanTransitively: Boolean = true): Unit = {
clean(closure, checkSerializable, cleanTransitively, Map.empty)
clean(closure, checkSerializable, cleanTransitively, mutable.Map.empty)
}

def scalaClean[T <: AnyRef](fun: T, checkSerializable: Boolean = true, cleanTransitively: Boolean = true): T = {
Expand Down Expand Up @@ -204,7 +204,7 @@ object ClosureCleaner {
func: AnyRef,
checkSerializable: Boolean,
cleanTransitively: Boolean,
accessedFields: Map[Class[_], Set[String]]
accessedFields: mutable.Map[Class[_], mutable.Set[String]]
): Unit = {

// indylambda check. Most likely to be the case with 2.12, 2.13
Expand Down Expand Up @@ -447,7 +447,7 @@ object ClosureCleaner {

private def instantiateClass(cls: Class[_], enclosingObject: AnyRef): AnyRef = {
// Use reflection to instantiate object without calling constructor
val rf = sun.reflect.ReflectionFactory.getReflectionFactory()
val rf = sun.reflect.ReflectionFactory.getReflectionFactory
val parentCtor = classOf[Object].getDeclaredConstructor()
val newCtor = rf.newConstructorForSerialization(cls, parentCtor)
val obj = newCtor.newInstance().asInstanceOf[AnyRef]
Expand Down Expand Up @@ -627,14 +627,14 @@ object IndylambdaScalaClosures {
def findAccessedFields(
lambdaProxy: SerializedLambda,
lambdaClassLoader: ClassLoader,
accessedFields: Map[Class[_], Set[String]],
accessedFields: mutable.Map[Class[_], mutable.Set[String]],
findTransitively: Boolean
): Unit = {

// We may need to visit the same class multiple times for different methods on it, and we'll
// need to lookup by name. So we use ASM's Tree API and cache the ClassNode/MethodNode.
val classInfoByInternalName = Map.empty[String, (Class[_], ClassNode)]
val methodNodeById = Map.empty[MethodIdentifier[_], MethodNode]
val classInfoByInternalName = mutable.Map.empty[String, (Class[_], ClassNode)]
val methodNodeById = mutable.Map.empty[MethodIdentifier[_], MethodNode]
def getOrUpdateClassInfo(classInternalName: String): (Class[_], ClassNode) = {
val classInfo = classInfoByInternalName.getOrElseUpdate(
classInternalName, {
Expand Down Expand Up @@ -672,19 +672,19 @@ object IndylambdaScalaClosures {
// inner closure
// we need to track calls from "inner closure" to outer classes relative to it (class T, A, B)
// to better find and track field accesses.
val trackedClassInternalNames = Set[String](implClassInternalName)
val trackedClassInternalNames = mutable.Set[String](implClassInternalName)

// Depth-first search for inner closures and track the fields that were accessed in them.
// Start from the lambda body's implementation method, follow method invocations
val visited = Set.empty[MethodIdentifier[_]]
val stack = Stack[MethodIdentifier[_]](implMethodId)
val visited = mutable.Set.empty[MethodIdentifier[_]]
val stack = mutable.Stack[MethodIdentifier[_]](implMethodId)
def pushIfNotVisited(methodId: MethodIdentifier[_]): Unit = {
if (!visited.contains(methodId)) {
stack.push(methodId)
}
}

while (!stack.isEmpty) {
while (stack.nonEmpty) {
val currentId = stack.pop()
visited += currentId

Expand Down Expand Up @@ -816,10 +816,10 @@ private case class MethodIdentifier[T](cls: Class[T], name: String, desc: String
* a set of visited methods to avoid cycles
*/
private class FieldAccessFinder(
fields: Map[Class[_], Set[String]],
fields: mutable.Map[Class[_], mutable.Set[String]],
findTransitively: Boolean,
specificMethod: Option[MethodIdentifier[_]] = None,
visitedMethods: Set[MethodIdentifier[_]] = Set.empty
visitedMethods: mutable.Set[MethodIdentifier[_]] = mutable.Set.empty
) extends ClassVisitor(ASM9) {

override def visitMethod(
Expand Down Expand Up @@ -868,7 +868,7 @@ private class FieldAccessFinder(
ClosureCleaner
.getClassReader(currentClass)
.accept(new FieldAccessFinder(fields, findTransitively, Some(m), visitedMethods), 0)
currentClass = currentClass.getSuperclass()
currentClass = currentClass.getSuperclass
}
}
}
Expand All @@ -878,7 +878,7 @@ private class FieldAccessFinder(
}
}

private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM9) {
private class InnerClosureFinder(output: mutable.Set[Class[_]]) extends ClassVisitor(ASM9) {
var myName: String = null

// TODO: Recursively find inner closures that we indirectly reference, e.g.
Expand Down Expand Up @@ -908,7 +908,7 @@ private class InnerClosureFinder(output: Set[Class[_]]) extends ClassVisitor(ASM
override def visitMethodInsn(op: Int, owner: String, name: String, desc: String, itf: Boolean): Unit = {
val argTypes = Type.getArgumentTypes(desc)
if (
op == INVOKESPECIAL && name == "<init>" && argTypes.length > 0
op == INVOKESPECIAL && name == "<init>" && argTypes.nonEmpty
&& argTypes(0).toString.startsWith("L") // is it an object?
&& argTypes(0).getInternalName == myName
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,8 @@ class ConnectedStreams[IN1, IN2](javaStream: JavaCStream[IN1, IN2]) {
val cleanFun2 = clean(fun2)

val flatMapper = new CoFlatMapFunction[IN1, IN2, R] {
def flatMap1(value: IN1, out: Collector[R]) = { cleanFun1(value).foreach(out.collect _) }
def flatMap2(value: IN2, out: Collector[R]) = { cleanFun2(value).foreach(out.collect _) }
def flatMap1(value: IN1, out: Collector[R]): Unit = { cleanFun1(value).foreach(out.collect) }
def flatMap2(value: IN2, out: Collector[R]): Unit = { cleanFun2(value).foreach(out.collect) }
}

flatMap(flatMapper)
Expand Down
Loading

0 comments on commit 32867e4

Please sign in to comment.