1:- module(rdf_notification,
2 [
3 rdf_subscribe/3, % +Topic, +Callback, -SubscriberId
4 rdf_subscribe/4, % +Topic, +Callback, +Options, -SubscriberId
5 rdf_unsubscribe/1 % +SubscriberId
6 ]).
15:- use_module(library(option)). 16:- use_module(library(gensym)). 17:- use_module(library(semweb/rdf_db)). 18:- use_module(library(thread_pool)). 19 20:- rdf_meta 21 rdf_subscribe(r, +, -), 22 rdf_subscribe(r, +, +, -). 23 24:- initialization 25 thread_create(rdf_notification_main(_{}), _, [alias(rdf_notification_main_thread)]), 26 current_prolog_flag(cpu_count, CPUs), 27 Size is CPUs + 1, 28 thread_pool_create(rdf_notification_caller_thread_pool, Size, []), 29 rdf_monitor(rdf_notification:rdf_event_received, [assert,assert(load),retract,update]).
Topic which could be subject, predicate or object. When assert,
retract or update event happens on the topic, Callback will be invoked. Upon successful
subscription, SubscriberId will be unified and can be later on used to unsubscribe.
Callback should be prefixed with a module name.
It is highly NOT recommended to do any intensive or long time computation in Callback,
however if that is required, a separated thread must be used.
Supported options are:
subject (default), predicate or object, indicating what to subscribe.Topic should also be monitored. The default value is false
and it's only applicable when type is subject. For example if Topic is a and this option is
true, and there is a triple =rdf(a, b, c)= already in database, =rdf_assert(c, d, e)= will
trigger a callback to subscribers of a.
N.B. subscribing to the same Topic with the same Callback and Options will result a new
subscribers without having any impact on the existing one. If this is not the wanted result,
unsubscribe existing one first.
61rdf_subscribe(Topic, Callback, SubscriberId) :- 62 rdf_subscribe(Topic, Callback, [type(subject),monitor_descendant(false)], SubscriberId). 63 64rdf_subscribe(Topic, Callback, Options, SubscriberId) :- 65 select_option(monitor_descendant(MonitorDescendant), Options, RestOptions1, false), 66 select_option(type(Type), RestOptions1, RestOptions2, subject), 67 gensym(sub_, SubscriberId), 68 thread_send_message(rdf_notification_main_thread, 69 rdf_subscribe(Topic, SubscriberId, Callback, 70 [type(Type),monitor_descendant(MonitorDescendant)|RestOptions2]) 71 ).
SubscriberId. Unsubscribing a non-existing subscriber
does not have any impact.
78rdf_unsubscribe(SubscriberId) :-
79 thread_send_message(rdf_notification_main_thread,
80 rdf_unsubscribe(SubscriberId)).
83rdf_event_received(Event) :-
84 thread_send_message(rdf_notification_main_thread, Event).87rdf_notification_main(Subscribers) :- 88 thread_get_message(Message), 89 debug(rdf_notification, 'received message ~w', [Message]), 90 handle_message(Subscribers, Message, NewSubscribers), 91 rdf_notification_main(NewSubscribers). 92 93handle_message(SubscribersDict, rdf_subscribe(Topic, SubscriberId, Callback, Options), 94 NewSubscribersDict) :- !, 95 ( Old = SubscribersDict.get(Topic) 96 -> true 97 ; Old = [] 98 ), 99 NewSubscribersDict = SubscribersDict.put(Topic, 100 [subscriber(SubscriberId, Callback, Options)|Old]), 101 debug(rdf_notification, 'SubscribersDict ~w', [NewSubscribersDict]). 102 103handle_message(SubscribersDict, rdf_unsubscribe(SubscriberId), NewSubscribersDict) :- 104 get_dict(Topic, SubscribersDict, Old), 105 selectchk(subscriber(SubscriberId, _, _), Old, Rest), !, 106 ( Rest \= [] 107 -> NewSubscribersDict = SubscribersDict.put(Topic, Rest) 108 ; del_dict(Topic, SubscribersDict, _, NewSubscribersDict) 109 ), 110 debug(rdf_notification, 'SubscribersDict ~w', [NewSubscribersDict]). 111handle_message(_, rdf_unsubscribe(_), _) :- !. 112 113handle_message(SubscribersDict, Message, SubscribersDict) :- 114 rdf_notify(SubscribersDict, Message). 115 116rdf_notify(SubscribersDict, Event) :- 117 Event =.. [_,Subject,Predicate,Object,DB|_], 118 rdf_notify_direct_subscribers(SubscribersDict, event_meta(subject, Subject, DB), Event), 119 rdf_notify_indirect_subscribers(SubscribersDict, Subject, DB, Event), 120 rdf_notify_direct_subscribers(SubscribersDict, event_meta(predicate, Predicate, DB), Event), 121 rdf_notify_direct_subscribers(SubscribersDict, event_meta(object, Object, DB), Event). 122 123rdf_notify_direct_subscribers(SubscribersDict, EventMeta, Event) :- 124 debug(rdf_notification, 'received event ~w with meta ~w', [Event,EventMeta]), 125 rdf_notify_direct_subscribers0(SubscribersDict, EventMeta, Event, false). 126 127rdf_notify_direct_subscribers0(SubscribersDict, event_meta(Type, Topic, DB), Event, FromDescendant) :- 128 ( Subscribers = SubscribersDict.get(Topic) 129 -> foreach(select_subscriber(subscriber(SubscriberId, Callback, _), 130 Subscribers, Type, DB, FromDescendant), 131 thread_create_in_pool(rdf_notification_caller_thread_pool, 132 ignore(call(Callback, SubscriberId, Topic, Event)), _, []) 133 ) 134 ; true 135 ). 136 137select_subscriber(Subscriber, Subscribers, Type, DB, FromDescendant) :- 138 member(Subscriber, Subscribers), 139 Subscriber = subscriber(_, _, Options), 140 option(type(Type), Options), 141 ( option(db(DB0), Options) 142 -> DB0 = DB 143 ; true 144 ), 145 ( 146 -> option(monitor_descendant(true), Options) 147 ; true 148 ). 149 150rdf_notify_indirect_subscribers(SubscribersDict, Object, DB, Event) :- 151 rdf_notify_indirect_subscribers0(SubscribersDict, Object, DB, Event, [Object], _). 152 153rdf_notify_indirect_subscribers0(SubscribersDict, Object, DB, Event, Seen0, Seen) :- 154 findall(Subject, 155 ( 156 rdf(Subject, _, Object), % there might be cross-db reference 157 \+ memberchk(Subject, Seen0) 158 ), 159 Subjects), 160 rdf_notify_indirect_subscribers1(SubscribersDict, Subjects, DB, Event, Seen0, Seen). 161 162rdf_notify_indirect_subscribers1(_, [], _, _, Seen, Seen) :- !. 163rdf_notify_indirect_subscribers1(SubscribersDict, [H|T], DB, Event, Seen0, Seen) :- 164 rdf_notify_direct_subscribers0(SubscribersDict, event_meta(subject, H, DB), Event, true), 165 rdf_notify_indirect_subscribers0(SubscribersDict, H, DB, Event, [H|Seen0], Seen1), 166 rdf_notify_indirect_subscribers1(SubscribersDict, T, DB, Event, Seen1, Seen)
RDF Notification
RDF Notification.