Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pruned Dynamic Programming Algorithm #333

Open
aaltenbernd opened this issue Nov 27, 2024 · 1 comment
Open

Pruned Dynamic Programming Algorithm #333

aaltenbernd opened this issue Nov 27, 2024 · 1 comment

Comments

@aaltenbernd
Copy link

First of all, I would like to sincerely thank you for this library. It works very well for our needs.

In our use case, we are interested in finding a single optimal change point (based on a cost function) to segment the data into two partitions. Since the number of change points is known in advance, the Dynamic Programming approach seems to be the most suitable.

I was wondering if the pruning technique described in this paper, which promises linear runtime in the best case, has been implemented. Alternatively, are there any other pruning techniques integrated into the library?

Thank you very much for your time and assistance.

@aaltenbernd
Copy link
Author

I am still interested in exploring the pruning aspect of Dynamic Programming.
For our current problem, we've observed that it can be reduced to the "At Most One Change" procedure.
We've implemented an BaseEstimator for this approach, if anyone is interested.

from ruptures.costs import cost_factory
from ruptures.base import BaseCost, BaseEstimator

class Amoc(BaseEstimator):
    def __init__(self, model="l2", custom_cost=None, jump=5, params=None):
        if custom_cost is not None and isinstance(custom_cost, BaseCost):
            self.cost = custom_cost
        else:
            self.model_name = model
            if params is None:
                self.cost = cost_factory(model=model)
            else:
                self.cost = cost_factory(model=model, **params)
        self.jump = jump
        self.n_samples = None

    def seg(self):
        cost_no_change = self.cost.error(0, self.n_samples)
        best_cost = {(0, self.n_samples): cost_no_change}
        
        for bkp in range(1, self.n_samples, self.jump):
            left_cost = self.cost.error(0, bkp)
            right_cost = self.cost.error(bkp, self.n_samples)
            total_cost = left_cost + right_cost

            if total_cost < sum(best_cost.values()):
                best_cost = {(0, bkp): left_cost, (bkp, self.n_samples): right_cost}
        
        return best_cost

    def fit(self, signal) -> "Amoc":
        self.cost.fit(signal)
        self.n_samples = signal.shape[0]
        return self

    def predict(self):
        partition = self.seg()
        bkps = [e for s, e in partition.keys()]
        return bkps

    def fit_predict(self, signal):
        self.fit(signal)
        return self.predict()

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant