Skip to content

Commit

Permalink
heartbeat callback during watching resources
Browse files Browse the repository at this point in the history
  • Loading branch information
Milan Unger committed Apr 27, 2022
1 parent c6af5bd commit 8463d1b
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 12 deletions.
2 changes: 1 addition & 1 deletion pack.pl
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
name(k8s_client).
version('1.0.3').
version('1.0.4').
title('kubernetes API client with watcher functionality').
author('Milan Unger', 'http://milung.eu/').
packager('Milan Unger', 'http://milung.eu/').
Expand Down
31 changes: 20 additions & 11 deletions prolog/k8s_client.pl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
k8s_watch_resources_async(2, +, +, +, -, +),
watch_modification_call(2, +, +, +, -, +, -),
watch_resources_loop(2, +, +, +, +, +),
watch_stream(2, +, +, +, +).
watch_stream(2, +, +, +, +, +).

%%% PUBLIC PREDICATES %%%%%%%%%%%%%%%%%%%%%%%%%%

Expand Down Expand Up @@ -159,6 +159,9 @@
% `Options` are same as for the `k8s_get_resource/6` with extra option:
% * `k8s_resource_version(ResourceVersion:atom)` - if specified the initial list is retrieved for the changed since the specified resource version.
% This option is used primary for internal purposes, and can be reset back to 0.
% * `heartbeat_callback(:Callback)` - the `Callback` is invoked each time there is a change, error, or channel timeout occuring during watching the resource.
% This may be usefull for healthiness check of the controller. While the loop tends to be robust to typical issue of the errors during watching the callback
% may implement additional level of robustness. The failure of the callback is ignored.
k8s_watch_resources(Callback, ApiGroup, Version, ResourceTypeName, Options) :-
select_option(k8s_resource_version(ResourceVersion), Options, Options1, 0),
watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, state(ResourceVersion, []), Options1).
Expand All @@ -171,10 +174,9 @@
thread_create(
k8s_watch_resources(Callback, ApiGroup, Version, ResourceTypeName, [watcher_id(Id), timeout(1) | Options]),
Id,
[ at_exit(retractall(watcher_status(Id,_)))
[ at_exit(retractall(watcher_status(Id, _)))
]
).

).

%%% PRIVATE PREDICATES %%%%%%%%%%%%%%%%%%%%%%%%%

Expand Down Expand Up @@ -500,7 +502,9 @@
},
print_message(informational, kubernetes(config_loaded, pod)),
!.


noop_healtz.

path_to_posix(Path, Posix) :-
atomic_list_concat(Segments, '\\', Path),
atomic_list_concat(Segments, '/', Posix).
Expand Down Expand Up @@ -595,24 +599,30 @@
uri_components(Uri, UriComponents),
http_open( Uri, Stream, Options2),
( option(watcher_id(Id), Options)
-> retractall(watcher_status(Id, running(_))),
-> retractall(watcher_status(Id, running(_))),
asserta(watcher_status(Id, running(Stream)))
; Id = []
),
( option(heartbeat_callback(HeartCallback), Options)
-> true
; HeartCallback = noop_healtz
),

!,
catch(
watch_stream(Callback, Stream, Id, State, State1),
watch_stream(Callback, HeartCallback, Stream, Id, State, State1),
error(k8s_watcher_error(connection_broken), State1),
true
),
sleep(1), % reduce CPU load in case of persistent connection error
!, % cut here to avoid recursion stack on async loop
watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, State1, Options).

watch_stream(_, _, Id, State, State) :-
watch_stream(_, _, _, Id, State, State) :-
watcher_status(Id, exit_request),
!.
watch_stream(Goal, Stream, Id, StateIn, StateOut) :-
watch_stream(Goal, HeartCallback, Stream, Id, StateIn, StateOut) :-
ignore(HeartCallback),
catch(
( peek_string(Stream, 4, _),
json_read_dict(Stream, Change, [end_of_file(end_of_file)]),
Expand All @@ -621,10 +631,9 @@
Error,
Change = Error
),

watch_modification_call(Goal, Id, Change, StateIn, State0),
!,
watch_stream(Goal, Stream, Id, State0, StateOut).
watch_stream(Goal, HeartCallback, Stream, Id, State0, StateOut).


watcher_exit(Id) :-
Expand Down

0 comments on commit 8463d1b

Please sign in to comment.