1:- module(k8s_client, [
2 k8s_create_resource/6, % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instanc:dict, -InstanceOut:dict, +Options:list
3 k8s_delete_resource/5, % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instance:dict/atom, +Options:list
4 k8s_get_resource/6, % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, ?InstanceName:atomic, -Instance:dict, +Options:list
5 k8s_resource_types/2, % -ResourceTypes:list(dict), +Options:list
6 k8s_update_resource/6, % +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Instanc:dict, -InstanceOut:dict, +Options:list
7 k8s_watch_resources/5, % :Callback, +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +Options:list
8 k8s_watch_resources_async/6 % :Callback, +ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, -StopWatcher, +Options:list
9 ]).
Example:
`prolog
?-
use(library(k8s_client))
.
?- k8s_get_resource(core, v1, pods, PodName, _, [k8s_namespace(myns)])
.
PodName = "dex-567cdd88cd-dbv9x" ;
PodName = "envoy-proxy-599b679cf7-94764" ;
...
23:- set_prolog_flag(generate_debug_info, false). 24 25:- use_module(library(yaml)). 26:- use_module(library(http/http_open)). 27:- use_module(library(http/json)). 28:- use_module(library(http/http_client)). 29 30 31:- dynamic 32 cluster_resources/2, 33 watcher_status/2. 34 35:- meta_predicate 36 k8s_watch_resources( , , , , ), 37 k8s_watch_resources_async( , , , , , ), 38 watch_modification_call( , , , , , , ), 39 watch_resources_loop( , , , , , ), 40 watch_stream( , , , , , ). 41 42%%% PUBLIC PREDICATES %%%%%%%%%%%%%%%%%%%%%%%%%%
47k8s_create_resource(ApiGroup, Version, ResourceTypeName, Instance, InstanceOut, Options) :-
48 context_options(Options, Options1),
49 resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options1),
50 api_get(Uri, InstanceOut, [method(post), post(json(Instance)) |Options1 ]).
55k8s_delete_resource(ApiGroup, Version, ResourceTypeName, Instance, Options) :-
56 ( is_dict(Instance)
57 -> InstanceName = Instance.metadata.name
58 ; InstanceName = Instance
59 ),
60 context_options(Options, Options1),
61 resource_uri(ApiGroup, Version, ResourceTypeName, Uri0, Options1),
62 atomic_list_concat([Uri0, InstanceName], '/', Uri),
63 api_get(Uri, _, [method(delete) |Options1 ]).
k8s_get_resource(+ApiGroup:atom, +Version:atom, +ResourceTypeName:atom, +InstanceName:atomic, -Instance:dict, +Options:list)
is nondet.
Unifies InstanceName - and Instance with the object representing resource of the kubernetes API. If the InstanceName is not bound
then all instances are retrieved. ApiGroup is either the valid name of the Kubernetes API Group or the core
atom.
The actual cluster address, context, and namespace is provided either options or loaded from the configuration. The below options are supported, in addition to options passed down to the http_open/3 predicate:
k8s_config(Config:dict)
- kubectl configuration - if not provided then the configuation is loaded by first succesfull of the following possibilities
k8s_context(Context:atom)
- name of the context to use from the k8s_config
option. If not specified, then current-context
property of k8s_config
is used as a default valuek8s_namespace(Namespace)
- the namespace from which to load resource instance(s)
. Shall be set to all
if all namespaces shall be listed
(will fail if InstanceName is bound and resource is namespaced). If not specified then the namespace provided as part of the context or the
default
namespace will be used.k8s_resource_types_mode(Mode)
where Mode is one of cache
(default), renew
, remote
, local
. Default to local
. Used when resolving
if the resource is namespaced. See also k8s_resource_types/2k8s_selectors(Selectors:list)
- list of selectors to apply for the resource retrieval(plain, not encoded form)k8s_query(Query:term)
- a query parameter to add to the REST call in form as specified by the predicate uri_query_components/2. This
option is concatenated if used multiple times86k8s_get_resource(ApiGroup, Version, ResourceTypeName, InstanceName, Instance, Options) :- 87 % list retrieval 88 var(InstanceName), 89 var(Instance), 90 context_options(Options, Options1), 91 resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options1), 92 api_get(Uri, List, Options1), 93 member(Instance, List.items), 94 ignore( InstanceName = Instance.get(metadata).name). 95 k8s_get_resource(ApiGroup, Version, ResourceTypeName, InstanceName, Instance, Options) :- 96 % list retrieval 97 nonvar(InstanceName), 98 var(Instance), 99 context_options(Options, Options1), 100 resource_uri(ApiGroup, Version, ResourceTypeName, Uri0, Options1), 101 atomic_list_concat([Uri0, InstanceName], '/', Uri), 102 api_get(Uri, Instance, Options1).
k8s_resource_types_mode(Mode)
where Mode is one of cache
(default), renew
, remote
, local
. In the
standard mode, the resources are cached per cluster and retrieved from the cache if available.
If renew
or `remote˙ mode is requested then the resource list is retrieved directly from the
API server. The 'remote' and local
mode leaves the cache untouched. The caching significantly speed up consequent
operation with kubernetes resources113k8s_resource_types(ResourceTypes, Options) :- 114 select_option(k8s_resource_types_mode(Caching), Options, Options0, cache), 115 Caching \= renew, 116 Caching \= remote, 117 context_options(Options0, Options1), 118 option(k8s_context(ContextName), Options1), 119 option(k8s_config(Config), Options1), 120 config_get_context(Config, ContextName, Context), 121 ClusterName = Context.cluster, 122 cluster_resources(ClusterName, ResourceTypes), 123 !. 124k8s_resource_types(ResourceTypes, Options) :- 125 select_option(k8s_resource_types_mode(Caching), Options, Options0, cache), 126 Caching \= local, 127 context_options(Options0, Options1), 128 api_resources_core( Core, Options1), 129 api_resources_groups( Grouped, Options1), 130 append(Core, Grouped, ResourceTypes), 131 % cache results 132 ( member(Caching, [cache, renew]) 133 -> option(k8s_context(ContextName), Options1), 134 option(k8s_config(Config), Options1), 135 config_get_context(Config, ContextName, Context), 136 ClusterName = Context.cluster, 137 retractall(cluster_resources(ClusterName,_)), 138 asserta(cluster_resources(ClusterName, ResourceTypes)) 139 ; true 140 ), 141 !.
146k8s_update_resource(ApiGroup, Version, ResourceTypeName, Instance, InstanceOut, Options) :-
147 context_options(Options, Options1),
148 resource_uri(ApiGroup, Version, ResourceTypeName, Uri0, Options1),
149 atomic_list_concat([Uri0, Instance.metadata.name], '/', Uri),
150 api_get(Uri, InstanceOut, [method(put), post(json(Instance)) |Options1 ]).
added
, modified
, deleted
atom. Initial list of instances
is provided as sequence of added
callbacks after the call of this predicate.
The call is blocking the caller thread.
Options are same as for the k8s_get_resource/6 with extra option:
k8s_resource_version(ResourceVersion:atom)
- if specified the initial list is retrieved for the changed since the specified resource version.
This option is used primary for internal purposes, and can be reset back to 0. - the
Callback` is invoked each time there is a change, error, or channel timeout occuring during watching the resource.
This may be usefull for healthiness check of the controller. While the loop tends to be robust to typical issue of the errors during watching the callback
may implement additional level of robustness. The failure of the callback is ignored.
165k8s_watch_resources(Callback, ApiGroup, Version, ResourceTypeName, Options) :-
166 select_option(k8s_resource_version(ResourceVersion), Options, Options1, 0),
167 watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, state(ResourceVersion, []), Options1).
call(StopWatcher)
. Other arguments are same as for the 'k8s_watch_resources/5`. Be aware that the Callback is invoked from the
different thread than the thread calling this predicate.173k8s_watch_resources_async(Callback, ApiGroup, Version, ResourceTypeName, k8s_client:watcher_exit(Id), Options) :- 174 thread_create( 175 k8s_watch_resources(Callback, ApiGroup, Version, ResourceTypeName, [watcher_id(Id), timeout(1) | Options]), 176 Id, 177 [ at_exit(retractall(watcher_status(Id, _))) 178 ] 179 ). 180 181%%% PRIVATE PREDICATES %%%%%%%%%%%%%%%%%%%%%%%%% 182 183api_get(Url, Reply) :- 184 api_get(Url, Reply, []). 185 186api_get( Url, Reply, Options) :- 187 config_connection_options( Url, UriComponents, Options, Options1), 188 uri_components(Uri, UriComponents), 189 http_get( Uri, Reply, [ json_object(dict) |Options1]). 190 191api_resources_core(Resources, Options) :- 192 api_get(api, Versions, Options), 193 foldl(api_resources_core_(Options), Versions.versions, [], Resources). 194 195api_resources_core_(Options, Version, In, Out) :- 196 atomic_list_concat([api, Version], '/', Url), 197 api_get(Url, Resources, Options), 198 maplist(put_dict( _{group: core, version: Version}), Resources.resources, Resources1), 199 append(In, Resources1, Out). 200 201api_resources_groups(Resources, Options) :- 202 api_get(apis, Groups, Options), 203 foldl(api_resources_groups_(Options), Groups.groups, [], Resources). 204 205api_resources_groups_(Options, Group, In, Resources) :- 206 Preferred = Group.preferredVersion.version, 207 api_resources_groups_(Options, Preferred, Group.preferredVersion, In, Resources). 208 209api_resources_groups_(Options, Preferred, GroupVersion, In, Out) :- 210 atomic_list_concat([apis, GroupVersion.groupVersion], '/', Url), 211 api_get(Url, GroupResources, Options), 212 ( Preferred = GroupVersion.version 213 -> IsPreferred = true 214 ; IsPreferred = false 215 ), 216 maplist( 217 put_dict( _{group: groupVersion, version: GroupVersion.version, isPreferred: IsPreferred }), 218 GroupResources.resources, 219 Resources1), 220 append(In, Resources1, Out). 221 222atomic_eq( Left, Right) :- 223 nonvar(Left), 224 atom_string(LeftA, Left), 225 atom_string(LeftA, Right), 226 !. 227atomic_eq( Left, Right) :- 228 nonvar(Right), 229 atom_string(RightA, Right), 230 atom_string(RightA, Left), 231 !. 232 233base64_certificate(Cert64, Certificate) :- 234 base64(Cert, Cert64), 235 atom_codes(Cert, CertCodes), 236 open_codes_stream(CertCodes, CertStream), 237 load_certificate(CertStream, Certificate). 238 239config_connection_options(ResourceUrl, Server, OptionsIn, OptionsOut) :- 240 ( select_option(k8s_config(Config), OptionsIn, Options1) 241 -> true 242 ; load_config(Config), 243 Options1 = OptionsIn 244 ), 245 ( select_option(k8s_context(ContextName), Options1, Options2) 246 -> true 247 ; config_current_context(Config, ContextName), 248 Options2 = Options1 249 ), 250 config_connection_options(ResourceUrl, Config, ContextName, Server, Options2, OptionsOut). 251 252config_connection_options(ResourceUrl, Config, ContextName, ServerUriComponents, OptionsIn, OptionsOut) :- 253 config_get_context(Config, ContextName, Ctx), 254 atom_string(ClusterName, Ctx.cluster), 255 config_get_cluster(Config, ClusterName, Cluster ), 256 config_connection_resource_uri(ResourceUrl, Cluster.server, UriComponents), 257 config_connections_queries(UriComponents, OptionsIn, ServerUriComponents, Options0), 258 config_cluster_options(Cluster, Options0, Options1), 259 config_client_options(Config, Ctx, Options1, OptionsOut), 260 !. 261 262config_connections_queries(UriCompnentsIn, OptionsIn, UriCompnentsOut, OptionsOut) :- 263 uri_data(search, UriCompnentsIn, Search), 264 ( var(Search) 265 -> Query0 = [] 266 ; uri_query_components(Search, Query0) 267 ), 268 config_connections_queries_(Query0, Queries, OptionsIn, OptionsOut), 269 ( Queries = [] 270 -> UriCompnentsIn = UriCompnentsOut 271 ; uri_query_components(QueriesSegment, Queries), 272 uri_data(search, UriCompnentsIn, QueriesSegment, UriCompnentsOut) 273 ). 274 275config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :- 276 select_option( k8s_query(Query), OptionsIn, Options), 277 is_list(Query), 278 append(Query, QueriesIn, Queries), 279 !, 280 config_connections_queries_( Queries, QueriesOut, Options, OptionsOut). 281 config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :- 282 select_option( k8s_query(Query), OptionsIn, Options), 283 !, 284 config_connections_queries_( [Query | QueriesIn], QueriesOut, Options, OptionsOut). 285 config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :- 286 select_option( k8s_resource_version(ResourceVersion), OptionsIn, Options), 287 ResourceVersion \= 0, 288 !, 289 config_connections_queries_( [ resourceVersion = ResourceVersion | QueriesIn], QueriesOut, Options, OptionsOut). 290 config_connections_queries_(QueriesIn, QueriesOut, OptionsIn, OptionsOut) :- 291 select_option( k8s_selectors(Selectors), OptionsIn, Options), 292 ( is_list(Selectors) 293 -> atomic_list_concat(Selectors, ',', SelectorsText) 294 ; SelectorsText = Selectors 295 ), 296 !, 297 config_connections_queries_( [ labelSelector = SelectorsText | QueriesIn], QueriesOut, Options, OptionsOut). 298 config_connections_queries_(Queries, Queries, Options, Options). 299 300config_connection_resource_uri(ResourceUrl, ServerName, UriComponents) :- 301 atom_string(Server, ServerName), 302 uri_components(Server, UriComponents0), 303 uri_data(path, UriComponents0, Path), 304 directory_file_path(Path, ResourceUrl, FullPath), 305 uri_data(path, UriComponents0, FullPath, UriComponents). 306 307config_ca_options(Cluster, OptionsIn, [ cacerts([certificate(CaCert)])| OptionsIn ]) :- 308 base64_certificate( Cluster.get('certificate-authority-data'), CaCert), 309 !. 310 config_ca_options(Cluster, OptionsIn, [ cacerts([file(CaCert)])| OptionsIn ]) :- 311 Cluster.get('certificate-authority') = CaCert, 312 !. 313 config_ca_options(_, _, _) :- 314 print_message(error, kubernetes(unsupported_config, cluster)), 315 fail. 316 317config_client_options(Config, Ctx, OptionsIn, [ certificate_key_pairs([ClientCert-ClientKey]) | OptionsIn]) :- 318 atom_string(UserName, Ctx.user), 319 config_get_user(Config, UserName, User), 320 base64(ClientCert, User.get('client-certificate-data')), 321 base64(ClientKey, User.get('client-key-data')), 322 !. 323 config_client_options(Config, Ctx, OptionsIn, [ certificate_file(ClientCert), certificate_file(ClientKey) | OptionsIn]) :- 324 atom_string(UserName, Ctx.user), 325 config_get_user(Config, UserName, User), 326 base64(ClientCert, User.get('client-certificate')), 327 base64(ClientKey, User.get('client-key')), 328 !. 329 config_client_options(Config, Ctx, OptionsIn, [ authorization(bearer(Token)) | OptionsIn]) :- 330 atom_string(UserName, Ctx.user), 331 config_get_user(Config, UserName, User), 332 atom_string(Token, User.get(token)), 333 !. 334 config_client_options(_, _, _, _) :- 335 print_message(error, kubernetes(unsupported_config, user)), 336 fail. 337 338config_cluster_options(Cluster, OptionsIn, OptionsOut) :- 339 config_ca_options(Cluster, OptionsIn, Options0), 340 ( Proxy = Cluster.get(proxy) 341 -> Options1 = [proxy(Proxy) | Options0 ] 342 ; Options1 = Options0 343 ), 344 ( Cluster.get('insecure-skip-tls-verify') = true 345 -> OptionsOut = [ cert_verify_hook(cert_accept_any) | Options1] 346 ; OptionsOut = Options1). 347 348config_current_context(Context) :- 349 load_config(Cfg), 350 config_current_context(Cfg, Context). 351 352config_current_context(Cfg, Context) :- 353 atom_string(Context, Cfg.get('current-context')). 354 355config_get_cluster(Config, ClusterName, Cluster) :- 356 member(ClusterDict, Config.clusters), 357 atom_string(ClusterName, ClusterDict.name), 358 Cluster = ClusterDict.cluster. 359 360config_get_context(Config, ContextName, Context) :- 361 member(ContextDict, Config.contexts), 362 atom_string(ContextName, ContextDict.name), 363 Context = ContextDict.context. 364 365config_get_user(Config, UserName, User) :- 366 member(UserDict, Config.users), 367 atom_string(UserName, UserDict.name), 368 User = UserDict.user. 369 370context_options( OptionsIn, OptionsOut) :- 371 ( option(k8s_config(Config), OptionsIn) 372 -> OptionsIn = Options1 373 ; load_config(Config), 374 Options1 = [ k8s_config(Config) | OptionsIn ] 375 ), 376 ( option(k8s_context(ContextName), Options1) 377 -> Options1 = Options2 378 ; config_current_context(Config, ContextName), 379 Options2 = [ k8s_context(ContextName) | Options1 ] 380 ), 381 ( option(k8s_namespace(_), Options2) 382 -> Options2 = OptionsOut 383 ; config_current_context(Config, ContextName), 384 config_get_context(Config, ContextName, Context), 385 dict_get_default(Context, namespace, "default", Namespace), 386 OptionsOut = [ k8s_namespace(Namespace) | Options2 ] 387 ), 388 !. 389 390dict_get_default(Dict, Key, Default, Value) :- 391 Dict.get(Key) = Value 392 -> true 393 ; Value = Default. 394 395is_resource_namespaced( ApiGroup, Version, ResourceTypeName, Options ) :- 396 ( option(k8s_resource_types_mode(_), Options) 397 -> LocalOptions = Options % respect caching options 398 ; LocalOptions = [ k8s_resource_types_mode(local) | Options ] % or avoid loading all resource types side effect 399 ), 400 % get list of the resource types 401 ( k8s_resource_types(ResourceTypes, LocalOptions) 402 -> true 403 ; ( ApiGroup = core 404 -> api_resources_core_(Options, Version, [], ResourceTypes) 405 ; atomic_list_concat([ApiGroup, Version], '/', GroupVersion), 406 api_resources_groups_(Options, Version, _{ version: Version, groupVersion: GroupVersion}, [], ResourceTypes) 407 ) 408 ), 409 % check if resource is naespaced 410 member( ResourceType, ResourceTypes), 411 atomic_eq(ResourceTypeName, ResourceType.name), 412 atomic_eq(ApiGroup, ResourceType.group), 413 atomic_eq(Version, ResourceType.version), 414 !, 415 true = ResourceType.namespaced. 416 417load_and_merge_config_file(Path, ConfigIn, ConfigOut) :- 418 path_to_posix(Path, PathPx), 419 yaml_read(PathPx, ConfigDict), 420 ConfigIn1 = _{ 421 clusters:[], users: [], contexts: [], 422 'current-context': "", preferences: _{} 423 }.put(ConfigIn), 424 ConfigDict1 = _{ 425 clusters:[], users: [], contexts: [], 426 'current-context': ConfigIn1.'current-context', 427 preferences: ConfigIn1.preferences 428 }.put(ConfigDict), 429 append(ConfigIn1.clusters, ConfigDict1.clusters, Clusters), 430 append(ConfigIn1.users, ConfigDict1.users, Users), 431 append(ConfigIn1.contexts, ConfigDict1.contexts, Contexts), 432 ConfigOut = 433 _{ 434 apiVersion: v1, 435 kind: 'Config', 436 preferences: ConfigDict1.preferences, 437 clusters: Clusters, 438 users: Users, 439 contexts: Contexts, 440 'current-context': ConfigDict1.'current-context' 441 }, 442 !. 443 444load_config(ConfigDict) :- % KUBECONFIG variant 445 getenv('KUBECONFIG', Path), 446 ( current_prolog_flag(windows, true) 447 -> atomic_list_concat(Files, ';', Path) 448 ; atomic_list_concat(Files, ':', Path) 449 ), 450 foldl(load_and_merge_config_file, Files, _{}, ConfigDict), 451 print_message(informational, kubernetes(config_loaded, kubeconfig)), 452 !. 453 load_config(ConfigDict) :- % ~/.kube/config variant 454 ( getenv('USERPROFILE', HomePath) 455 ; getenv('HOME', HomePath) 456 ), 457 path_to_posix(HomePath, HomePathPx), 458 directory_file_path(HomePathPx, '.kube/config', ConfigPath), 459 exists_file(ConfigPath), 460 yaml_read(ConfigPath, ConfigDict), 461 print_message(informational, kubernetes(config_loaded, user_config)), 462 463 !. 464 465 load_config(ConfigDict) :- % access api from pod 466 exists_file('/var/run/secrets/kubernetes.io/serviceaccount/token'), 467 exists_file('/var/run/secrets/kubernetes.io/serviceaccount/ca.crt'), 468 exists_file('/var/run/secrets/kubernetes.io/serviceaccount/namespace'), 469 read_file_to_string('/var/run/secrets/kubernetes.io/serviceaccount/token', Token, []), 470 read_file_to_string('/var/run/secrets/kubernetes.io/serviceaccount/namespace', Namespace, []), 471 ConfigDict = _{ 472 apiVersion: v1, 473 kind: 'Config', 474 clusters: [ 475 _{ 476 name: "default-api", 477 cluster: _{ 478 'certificate-authority': "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt", 479 server: "https://kubernetes.default.svc" 480 } 481 } 482 ], 483 users: [ 484 _{ 485 name: "service-account", 486 user: _{ 487 token: Token 488 } 489 } 490 ], 491 contexts: [ 492 _{ 493 name: "from-pod", 494 context: _{ 495 cluster: "default-api", 496 user: "service-account", 497 namespace: Namespace 498 } 499 } 500 ], 501 'current-context': "from-pod" 502 }, 503 print_message(informational, kubernetes(config_loaded, pod)), 504 !. 505 506noop_healtz. 507 508path_to_posix(Path, Posix) :- 509 atomic_list_concat(Segments, '\\', Path), 510 atomic_list_concat(Segments, '/', Posix). 511 512prologmessage(kubernetes(unsupported_config, cluster)) --> 513 ['Kubernetes: Configuration of the cluster is not supported']. 514 prologmessage(kubernetes(unsupported_config, user)) --> 515 ['Kubernetes: Configuration of the user ']. 516 prologmessage(kubernetes(watcher_exited, Resource)) --> 517 ['Kubernetes: Watching of the resources type `~p` exited '- Resource]. 518 prologmessage(kubernetes(watcher_update_failure, Goal, Id)) --> 519 ['Kubernetes: Calling the goal ~p from resource controller for resource ~p modification failed'- [Goal, Id]]. 520 prologmessage(kubernetes(config_loaded, kubeconfig)) --> 521 ['Kubernetes: Configuration taken using the environment variable KUBECONFIG']. 522 prologmessage(kubernetes(config_loaded, user_config)) --> 523 ['Kubernetes: Configuration taken the users home directory']. 524 prologmessage(kubernetes(config_loaded, pod)) --> 525 ['Kubernetes: Configuration taken from the kubernetes pod service account']. 526 prologmessage(kubernetes(watch_modification, Change)) --> 527 { atom_json_dict(Json, Change, [as(atom), width(0)]) }, 528 ['Kubernetes: Modification of resources detected: ~p' - [Json] ]. 529prologmessage(kubernetes(watcher_heartbeat_callback_failure, HeartCallback, Id)) --> 530 ['Kubernetes: The heartbeat callback `~w` of the watcher thread `~w` failed' - [HeartCallback, Id] ]. 531 532resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options) :- 533 option(k8s_namespace(all), Options, all), 534 ( ApiGroup = core 535 -> atomic_list_concat([api, Version, ResourceTypeName], '/', Uri) 536 ; atomic_list_concat([apis, ApiGroup, Version, ResourceTypeName ], '/', Uri) 537 ), 538 !. 539 resource_uri(ApiGroup, Version, ResourceTypeName, Uri, Options) :- 540 select_option(k8s_namespace(Namespace), Options, Options1), 541 ( is_resource_namespaced(ApiGroup, Version, ResourceTypeName, Options) 542 -> ( ApiGroup = core 543 -> atomic_list_concat([api, Version, namespaces, Namespace, ResourceTypeName], '/', Uri) 544 ; atomic_list_concat([apis, ApiGroup, Version, namespaces, Namespace, ResourceTypeName], '/', Uri) 545 ) 546 ; resource_uri(ApiGroup, Version, ResourceTypeName, Uri, [ k8s_namespace(all) | Options1 ]) 547 ), 548 !. 549 550watch_modification_call(_, Id, _, Version, Version, KnownResources, KnownResources) :- 551 watcher_status(Id, exit_request), % just exit if exist is requested 552 !. 553 watch_modification_call(_, _, error(timeout_error(read, _), _) , State, State) :- !. % just continue listening 554 watch_modification_call(_, _, error(_, _), State, _) :- % rethrow other errors 555 throw(error(k8s_watcher_error(connection_broken), State)). 556 watch_modification_call(_, _, end_of_file, State, _) :- % need reconnect if stream is ended 557 throw(error(k8s_watcher_error(connection_broken), State)). 558 watch_modification_call(_, _, Change, state(_, R), state(0, R)) :- 559 Change.get(type) = "ERROR" . % if no bookmark was sent and only old resources available then reset version 560 watch_modification_call(_, _, Change, state(_, R), state(Version, R)) :- 561 Change.get(type) = "BOOKMARK", 562 Version = Change.object.metadata.resourceVersion. 563 watch_modification_call(Goal, _, Change, state(_, R), state(Version, [Resource | Rest])) :- 564 memberchk(Change.get(type), [ "ADDED", "MODIFIED"]), 565 dict_get_default(Change.object.metadata, namespace, [], ResourceNamespace), 566 ResourceName = Change.object.metadata.name, 567 Version = Change.object.metadata.resourceVersion, 568 ( select(resource(ResourceNamespace, ResourceName, OldVersion), R, Rest) 569 -> ( OldVersion = Version 570 -> true 571 ; call(Goal, modified, Change.object) 572 ) 573 ; call(Goal, added, Change.object), 574 Rest = R 575 ), 576 Resource = resource(ResourceNamespace, ResourceName, Version). 577 watch_modification_call(Goal, _, Change, state(_, R), state(Version, Rest)) :- 578 dict_get_default(Change.object.metadata, namespace, [], ResourceNamespace), 579 ResourceName = Change.object.metadata.name, 580 Version = Change.object.metadata.resourceVersion, 581 ( select(resource(ResourceNamespace, ResourceName, _), R, Rest) 582 -> call(Goal, deleted, Change.object) 583 ; Rest = R 584 ). 585 watch_modification_call(Goal, Id, _, State, State) :- 586 print_message(error, kubernetes(watcher_update_failure,Goal, Id)). 587 588 589watch_resources_loop(_, _, _, ResourceTypeName, state(Version, _), Options) :- % special handling to exit the async loop 590 ( option(watcher_id(Id), Options), 591 watcher_status(Id, exit_request) 592 -> ( print_message(informational, kubernetes(watcher_exited, ResourceTypeName)), 593 thread_exit(resourceVersion(Version)) 594 ) 595 ), 596 !. 597 watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, State, Options) :- 598 context_options([k8s_query(watch=1), k8s_query(allowWatchBookmarks=true) | Options], Options1), 599 resource_uri(ApiGroup, Version, ResourceTypeName, Url, Options1), 600 config_connection_options( Url, UriComponents, Options1, Options2), 601 uri_components(Uri, UriComponents), 602 http_open( Uri, Stream, Options2), 603 ( option(watcher_id(Id), Options) 604 -> retractall(watcher_status(Id, running(_))), 605 asserta(watcher_status(Id, running(Stream))) 606 ; Id = [] 607 ), 608 ( option(heartbeat_callback(HeartCallback), Options) 609 -> true 610 ; HeartCallback = noop_healtz 611 ), 612 613 !, 614 catch( 615 watch_stream(Callback, HeartCallback, Stream, Id, State, State1), 616 error(k8s_watcher_error(connection_broken), State1), 617 true 618 ), 619 sleep(1), % reduce CPU load in case of persistent connection error 620 !, % cut here to avoid recursion stack on async loop 621 watch_resources_loop(Callback, ApiGroup, Version, ResourceTypeName, State1, Options). 622 623watch_stream(_, _, _, Id, State, State) :- 624 watcher_status(Id, exit_request), 625 !. 626watch_stream(Goal, HeartCallback, Stream, Id, StateIn, StateOut) :- 627 ( call(HeartCallback) 628 -> true 629 ; print_message(error, kubernetes(watcher_heartbeat_callback_failure, HeartCallback, Id)) 630 ), 631 catch( 632 ( peek_string(Stream, 4, _), 633 json_read_dict(Stream, Change, [end_of_file(end_of_file)]), 634 print_message(informational, kubernetes(watch_modification, Change)) 635 ), 636 Error, 637 Change = Error 638 ), 639 watch_modification_call(Goal, Id, Change, StateIn, State0), 640 !, 641 watch_stream(Goal, HeartCallback, Stream, Id, State0, StateOut), 642 !. 643 644watcher_exit(Id) :- 645 watcher_exit(Id, _). 646 647watcher_exit(Id, Status) :- 648 ( retract(k8s_client:watcher_status(Id, running(Stream))) 649 -> assertz(k8s_client:watcher_status(Id, exit_request)), 650 retractall(k8s_client:watcher_status(Id, running(_))), 651 close(Stream) 652 ; assertz(k8s_client:watcher_status(Id, exit_request)) 653 ), 654 thread_join(Id, Status), 655 ignore(retractall(k8s_client:watcher_status(Id,_)))