Skip to content

Commit

Permalink
Merge pull request #235 from Guocork/master
Browse files Browse the repository at this point in the history
  • Loading branch information
wangeguo authored Sep 6, 2024
2 parents 3fe7ad4 + 4b5de2a commit 839dd2c
Show file tree
Hide file tree
Showing 4 changed files with 111 additions and 0 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions controllers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ tokio.workspace = true
toml.workspace = true
tracing-subscriber.workspace = true
tracing.workspace = true
chrono = "0.4.38"
2 changes: 2 additions & 0 deletions controllers/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ mod actor_controller;
mod credentials_watcher;
mod namespace_watcher;
mod playbook_controller;
mod timeout_controller;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
Expand All @@ -57,6 +58,7 @@ async fn main() -> anyhow::Result<()> {
_ = actor_controller::new(&ctx) => tracing::warn!("actor controller exited"),
_ = credentials_watcher::new(&ctx) => tracing::warn!("credentials watcher exited"),
_ = namespace_watcher::new(&ctx) => tracing::warn!("namespace watcher exited"),
_ = timeout_controller::new(&ctx) => tracing::warn!("timeout controller exited")
}

Ok(())
Expand Down
107 changes: 107 additions & 0 deletions controllers/src/timeout_controller.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
// Copyright (c) The Amphitheatre Authors. All rights reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use amp_common::resource::Playbook;
use amp_resources::playbook::delete;
use chrono::{DateTime, Duration, TimeDelta, Utc};
use futures::{future, StreamExt};
use kube::Client;
use kube::{
runtime::{reflector, watcher, WatchStreamExt},
Api,
};
use tracing::{error, info, warn};

use crate::context::Context;

/// The strategy is to evaluate the execution status of the playbook.
enum Strategy {
/// Handling the expiration of the playbook.
Expired,

/// Handling the retention of the playbook.
Remain(TimeDelta),
}

// Implement the From trait for Strategy.
impl From<DateTime<Utc>> for Strategy {
fn from(value: DateTime<Utc>) -> Self {
let now = Utc::now();
if value < now {
Self::Expired
} else {
Self::Remain(value - now)
}
}
}

pub async fn new(ctx: &Arc<Context>) {
let client = ctx.k8s.clone();
let api = Api::<Playbook>::all(client.clone());
let config = watcher::Config::default();
let (reader, writer) = reflector::store();
let rf = reflector(writer, watcher(api, config));

tokio::spawn(async move {
if let Err(e) = reader.wait_until_ready().await {
error!("Failed to wait until ready: {:?}", e);
return;
}
info!("Timeout controller is running...");
loop {
for p in reader.state() {
if let Err(err) = handle(p.as_ref(), &client).await {
error!("Delete playbook failed: {}", err.to_string());
}
}
tokio::time::sleep(std::time::Duration::from_secs(5 * 60)).await;
}
});

rf.applied_objects().for_each(|_| future::ready(())).await;
}

async fn handle(playbook: &Playbook, client: &Client) -> anyhow::Result<()> {
if let Some(ttl) = playbook
.metadata
.annotations
.as_ref()
.and_then(|annotations| annotations.get("ttl"))
.and_then(|ttl_str| ttl_str.parse::<i64>().ok())
{
if let Some(timestamp) = playbook.metadata.creation_timestamp.as_ref() {
let expiration_time = timestamp.0 + Duration::seconds(ttl);
let stratege = Strategy::from(expiration_time);
match stratege {
Strategy::Expired => {
if let Some(name) = &playbook.metadata.name {
delete(client, name).await?;
}
}
Strategy::Remain(time) => {
if time == Duration::days(3) {
send_message().await;
}
}
}
}
}
Ok(())
}

async fn send_message() {
warn!("Email sending functionality is not implemented yet");
}

0 comments on commit 839dd2c

Please sign in to comment.