1:- module(rdf_notification,
2 [
3 rdf_subscribe/3, % +Topic, +Callback, -SubscriberId
4 rdf_subscribe/4, % +Topic, +Callback, +Options, -SubscriberId
5 rdf_unsubscribe/1 % +SubscriberId
6 ]).
15:- use_module(library(option)). 16:- use_module(library(gensym)). 17:- use_module(library(semweb/rdf_db)). 18:- use_module(library(thread_pool)). 19 20:- rdf_meta 21 rdf_subscribe(r, +, -), 22 rdf_subscribe(r, +, +, -). 23 24:- initialization 25 thread_create(rdf_notification_main(_{}), _, [alias(rdf_notification_main_thread)]), 26 current_prolog_flag(cpu_count, CPUs), 27 Size is CPUs + 1, 28 thread_pool_create(rdf_notification_caller_thread_pool, Size, []), 29 rdf_monitor(rdf_notification:rdf_event_received, [assert,assert(load),retract,update]).
Topic
which could be subject
, predicate
or object
. When assert,
retract or update event happens on the topic, Callback
will be invoked. Upon successful
subscription, SubscriberId
will be unified and can be later on used to unsubscribe.
Callback
should be prefixed with a module name.
It is highly NOT recommended to do any intensive or long time computation in Callback
,
however if that is required, a separated thread must be used.
Supported options are:
subject
(default), predicate
or object
, indicating what to subscribe.Topic
should also be monitored. The default value is false
and it's only applicable when type
is subject
. For example if Topic
is a
and this option is
true
, and there is a triple =rdf(a, b, c)
= already in database, =rdf_assert(c, d, e)
= will
trigger a callback to subscribers of a
.
N.B. subscribing to the same Topic
with the same Callback
and Options
will result a new
subscribers without having any impact on the existing one. If this is not the wanted result,
unsubscribe existing one first.
61rdf_subscribe(Topic, Callback, SubscriberId) :- 62 rdf_subscribe(Topic, Callback, [type(subject),monitor_descendant(false)], SubscriberId). 63 64rdf_subscribe(Topic, Callback, Options, SubscriberId) :- 65 select_option(monitor_descendant(MonitorDescendant), Options, RestOptions1, false), 66 select_option(type(Type), RestOptions1, RestOptions2, subject), 67 gensym(sub_, SubscriberId), 68 thread_send_message(rdf_notification_main_thread, 69 rdf_subscribe(Topic, SubscriberId, Callback, 70 [type(Type),monitor_descendant(MonitorDescendant)|RestOptions2]) 71 ).
SubscriberId
. Unsubscribing a non-existing subscriber
does not have any impact.
78rdf_unsubscribe(SubscriberId) :-
79 thread_send_message(rdf_notification_main_thread,
80 rdf_unsubscribe(SubscriberId)).
83rdf_event_received(Event) :-
84 thread_send_message(rdf_notification_main_thread, Event).
87rdf_notification_main(Subscribers) :- 88 thread_get_message(Message), 89 debug(rdf_notification, 'received message ~w', [Message]), 90 handle_message(Subscribers, Message, NewSubscribers), 91 rdf_notification_main(NewSubscribers). 92 93handle_message(SubscribersDict, rdf_subscribe(Topic, SubscriberId, Callback, Options), 94 NewSubscribersDict) :- !, 95 ( Old = SubscribersDict.get(Topic) 96 -> true 97 ; Old = [] 98 ), 99 NewSubscribersDict = SubscribersDict.put(Topic, 100 [subscriber(SubscriberId, Callback, Options)|Old]), 101 debug(rdf_notification, 'SubscribersDict ~w', [NewSubscribersDict]). 102 103handle_message(SubscribersDict, rdf_unsubscribe(SubscriberId), NewSubscribersDict) :- 104 get_dict(Topic, SubscribersDict, Old), 105 selectchk(subscriber(SubscriberId, _, _), Old, Rest), !, 106 ( Rest \= [] 107 -> NewSubscribersDict = SubscribersDict.put(Topic, Rest) 108 ; del_dict(Topic, SubscribersDict, _, NewSubscribersDict) 109 ), 110 debug(rdf_notification, 'SubscribersDict ~w', [NewSubscribersDict]). 111handle_message(_, rdf_unsubscribe(_), _) :- !. 112 113handle_message(SubscribersDict, Message, SubscribersDict) :- 114 rdf_notify(SubscribersDict, Message). 115 116rdf_notify(SubscribersDict, Event) :- 117 Event =.. [_,Subject,Predicate,Object,DB|_], 118 rdf_notify_direct_subscribers(SubscribersDict, event_meta(subject, Subject, DB), Event), 119 rdf_notify_indirect_subscribers(SubscribersDict, Subject, DB, Event), 120 rdf_notify_direct_subscribers(SubscribersDict, event_meta(predicate, Predicate, DB), Event), 121 rdf_notify_direct_subscribers(SubscribersDict, event_meta(object, Object, DB), Event). 122 123rdf_notify_direct_subscribers(SubscribersDict, EventMeta, Event) :- 124 debug(rdf_notification, 'received event ~w with meta ~w', [Event,EventMeta]), 125 rdf_notify_direct_subscribers0(SubscribersDict, EventMeta, Event, false). 126 127rdf_notify_direct_subscribers0(SubscribersDict, event_meta(Type, Topic, DB), Event, FromDescendant) :- 128 ( Subscribers = SubscribersDict.get(Topic) 129 -> foreach(select_subscriber(subscriber(SubscriberId, Callback, _), 130 Subscribers, Type, DB, FromDescendant), 131 thread_create_in_pool(rdf_notification_caller_thread_pool, 132 ignore(call(Callback, SubscriberId, Topic, Event)), _, []) 133 ) 134 ; true 135 ). 136 137select_subscriber(Subscriber, Subscribers, Type, DB, FromDescendant) :- 138 member(Subscriber, Subscribers), 139 Subscriber = subscriber(_, _, Options), 140 option(type(Type), Options), 141 ( option(db(DB0), Options) 142 -> DB0 = DB 143 ; true 144 ), 145 ( 146 -> option(monitor_descendant(true), Options) 147 ; true 148 ). 149 150rdf_notify_indirect_subscribers(SubscribersDict, Object, DB, Event) :- 151 rdf_notify_indirect_subscribers0(SubscribersDict, Object, DB, Event, [Object], _). 152 153rdf_notify_indirect_subscribers0(SubscribersDict, Object, DB, Event, Seen0, Seen) :- 154 findall(Subject, 155 ( 156 rdf(Subject, _, Object), % there might be cross-db reference 157 \+ memberchk(Subject, Seen0) 158 ), 159 Subjects), 160 rdf_notify_indirect_subscribers1(SubscribersDict, Subjects, DB, Event, Seen0, Seen). 161 162rdf_notify_indirect_subscribers1(_, [], _, _, Seen, Seen) :- !. 163rdf_notify_indirect_subscribers1(SubscribersDict, [H|T], DB, Event, Seen0, Seen) :- 164 rdf_notify_direct_subscribers0(SubscribersDict, event_meta(subject, H, DB), Event, true), 165 rdf_notify_indirect_subscribers0(SubscribersDict, H, DB, Event, [H|Seen0], Seen1), 166 rdf_notify_indirect_subscribers1(SubscribersDict, T, DB, Event, Seen1, Seen)
RDF Notification
RDF Notification.