Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

提供实时任务限流&执行状态可视化功能 #397

Open
baisui1981 opened this issue Dec 1, 2024 · 2 comments
Open

提供实时任务限流&执行状态可视化功能 #397

baisui1981 opened this issue Dec 1, 2024 · 2 comments

Comments

@baisui1981
Copy link
Member

为了对正在执行的实时任务进行更细粒度的控制,需要提供任务限流&执行状态可视化功能,以应对实时任务执行期间执行批量任务构建,或者业务系统中有突发事件需要紧急限流。

需要在控制面中提供两个子功能予以配合:

  1. 任务限流: 可以实时控制每秒表纪录处理量
  2. 执行状态(吞吐速率)可视化: 设置任务限流功能后,可以立即从可视化看板上观察到执行吞吐的变化
@baisui1981
Copy link
Member Author

flink 内实现限流功能参考

为了在 Flink 作业运行时动态设置 updateRate 方法,你需要一个机制来接收外部的配置更新,并将这些更新应用到正在运行的任务实例中。这可以通过多种方式实现,下面我将详细说明一种常见的方法:使用广播变量和 Akka 的 Actor 系统。

使用广播变量和 Akka Actor 系统

  1. 设置广播变量

首先,在你的主程序(即提交 Flink Job 的地方)中,你需要创建一个广播流,用于发送最新的限流配置给所有并行任务实例。你可以使用一个特殊的 SourceFunction 来生成包含新速率值的消息,并将其与你的主要数据流合并。

// 创建一个广播流,用于发送新的限流配置
DataStream<Double> rateConfigStream = env.addSource(new RateConfigSource())
    .broadcast();

// 主要的数据流
DataStream<String> mainDataStream = ...;

// 将两者连接起来
ConnectedStreams<String, Double> connectedStreams = mainDataStream.connect(rateConfigStream);

// 应用自定义的 CoFlatMapFunction 或 RichCoFlatMapFunction 处理逻辑
connectedStreams.flatMap(new DynamicRateControlledFunction());
  1. 修改 DynamicRateControlledFunction
    接下来,修改 DynamicRateControlledFunction 类以支持接收广播变量中的最新配置:
public class DynamicRateControlledFunction extends RichCoFlatMapFunction<String, Double, String> {

    private transient RateLimiter rateLimiter;
    private transient BroadcastState<String, Double> broadcastState;

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        
        // 获取广播状态
        ValueStateDescriptor<Double> descriptor = new ValueStateDescriptor<>("rate", Double.class);
        broadcastState = getRuntimeContext().getBroadcastState(descriptor);
        
        // 初始化默认限流器
        double initialRate = broadcastState.get("rate") != null ? broadcastState.get("rate") : 10.0;
        rateLimiter = RateLimiter.create(initialRate); // 假设每秒允许10个事件
    }

    @Override
    public void flatMap1(String value, Collector<String> out) throws Exception {
        if (rateLimiter.tryAcquire()) {
            // 如果获取到令牌,则继续处理
            out.collect(value);
        } else {
            // 否则丢弃或者延迟处理
        }
    }

    @Override
    public void flatMap2(Double newRate, Collector<String> out) throws Exception {
        // 更新广播状态中的限流速率
        broadcastState.put("rate", newRate);
        // 更新本地限流器的速率
        rateLimiter.setRate(newRate);
    }
}

在这个版本的 DynamicRateControlledFunction 中,我们重写了 flatMap2 方法来处理来自广播流的新速率值。每当收到新的速率配置时,它会更新广播状态和本地的 RateLimiter 实例。

  1. 使用 Akka Actor 系统接收外部命令

为了让外部系统能够触发速率更新,你可以设置一个 Akka Actor 来监听 HTTP 请求或其他形式的远程调用。这个 Actor 可以直接向广播流发送消息,或者通过其他方式通知广播流有新的速率配置需要传播。

例如,你可以创建一个简单的 REST API 服务器,当接收到 POST 请求时,它会调用 Akka Actor 发送速率更新消息到广播流。

import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import scala.concurrent.ExecutionContextExecutor

implicit val system: ActorSystem = ActorSystem("my-system")
implicit val materializer: ActorMaterializer = ActorMaterializer()
implicit val executionContext: ExecutionContextExecutor = system.dispatcher

val route =
  path("update-rate") {
    post {
      entity(as[Double]) { newRate =>
        // 调用广播流的源函数来发送新的速率配置
        RateConfigSource.updateRate(newRate)
        complete(s"Rate updated to $newRate")
      }
    }
  }

Http().bindAndHandle(route, "localhost", 8080)
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine() // 让应用程序保持运行

这里的 RateConfigSource.updateRate(newRate) 是一个假设的方法,它应该负责将新的速率值推送到广播流中。具体的实现取决于你如何设计广播流的源函数。

通过这种方式,你可以在 Flink 作业启动后动态地调整每个任务实例的限流速率,而不需要重启整个作业。这种方法不仅提供了灵活性,还保证了高可用性和实时性。

@baisui1981
Copy link
Member Author

#353

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant