-
Notifications
You must be signed in to change notification settings - Fork 222
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
提供实时任务限流&执行状态可视化功能 #397
Comments
flink 内实现限流功能参考为了在 Flink 作业运行时动态设置 updateRate 方法,你需要一个机制来接收外部的配置更新,并将这些更新应用到正在运行的任务实例中。这可以通过多种方式实现,下面我将详细说明一种常见的方法:使用广播变量和 Akka 的 Actor 系统。 使用广播变量和 Akka Actor 系统
首先,在你的主程序(即提交 Flink Job 的地方)中,你需要创建一个广播流,用于发送最新的限流配置给所有并行任务实例。你可以使用一个特殊的 SourceFunction 来生成包含新速率值的消息,并将其与你的主要数据流合并。 // 创建一个广播流,用于发送新的限流配置
DataStream<Double> rateConfigStream = env.addSource(new RateConfigSource())
.broadcast();
// 主要的数据流
DataStream<String> mainDataStream = ...;
// 将两者连接起来
ConnectedStreams<String, Double> connectedStreams = mainDataStream.connect(rateConfigStream);
// 应用自定义的 CoFlatMapFunction 或 RichCoFlatMapFunction 处理逻辑
connectedStreams.flatMap(new DynamicRateControlledFunction());
public class DynamicRateControlledFunction extends RichCoFlatMapFunction<String, Double, String> {
private transient RateLimiter rateLimiter;
private transient BroadcastState<String, Double> broadcastState;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
// 获取广播状态
ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("rate", Double.class);
broadcastState = getRuntimeContext().getBroadcastState(descriptor);
// 初始化默认限流器
double initialRate = broadcastState.get("rate") != null ? broadcastState.get("rate") : 10.0;
rateLimiter = RateLimiter.create(initialRate); // 假设每秒允许10个事件
}
@Override
public void flatMap1(String value, Collector<String> out) throws Exception {
if (rateLimiter.tryAcquire()) {
// 如果获取到令牌,则继续处理
out.collect(value);
} else {
// 否则丢弃或者延迟处理
}
}
@Override
public void flatMap2(Double newRate, Collector<String> out) throws Exception {
// 更新广播状态中的限流速率
broadcastState.put("rate", newRate);
// 更新本地限流器的速率
rateLimiter.setRate(newRate);
}
} 在这个版本的 DynamicRateControlledFunction 中,我们重写了 flatMap2 方法来处理来自广播流的新速率值。每当收到新的速率配置时,它会更新广播状态和本地的 RateLimiter 实例。
为了让外部系统能够触发速率更新,你可以设置一个 Akka Actor 来监听 HTTP 请求或其他形式的远程调用。这个 Actor 可以直接向广播流发送消息,或者通过其他方式通知广播流有新的速率配置需要传播。 例如,你可以创建一个简单的 REST API 服务器,当接收到 POST 请求时,它会调用 Akka Actor 发送速率更新消息到广播流。 import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.ExecutionContextExecutor
implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher
val route =
path("update-rate") {
post {
entity(as[Double]) { newRate =>
// 调用广播流的源函数来发送新的速率配置
RateConfigSource.updateRate(newRate)
complete(s"Rate updated to $newRate")
}
}
}
Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // 让应用程序保持运行 这里的 RateConfigSource.updateRate(newRate) 是一个假设的方法,它应该负责将新的速率值推送到广播流中。具体的实现取决于你如何设计广播流的源函数。 通过这种方式,你可以在 Flink 作业启动后动态地调整每个任务实例的限流速率,而不需要重启整个作业。这种方法不仅提供了灵活性,还保证了高可用性和实时性。 |
为了对正在执行的实时任务进行更细粒度的控制,需要提供任务限流&执行状态可视化功能,以应对实时任务执行期间执行批量任务构建,或者业务系统中有突发事件需要紧急限流。
需要在控制面中提供两个子功能予以配合:
任务限流
功能后,可以立即从可视化看板上观察到执行吞吐的变化The text was updated successfully, but these errors were encountered: