1:- module(stompl, [
2 connection/2, % +Address, -Connection
3 connection/3, % +Address, +CallbackDict, -Connection
4 setup/1, % +Connection
5 teardown/1, % +Connection
6 connect/3, % +Connection, +Host, +Headers
7 send/3, % +Connection, +Destination, +Headers
8 send/4, % +Connection, +Destination, +Headers, +Body
9 send_json/4, % +Connection, +Destination, +Headers, +JSON
10 subscribe/4, % +Connection, +Destination, +Id, +Headers
11 unsubscribe/2, % +Connection, +Id
12 ack/3, % +Connection, +MessageId, +Headers
13 nack/3, % +Connection, +MessageId, +Headers
14 begin/2, % +Connection, +Transaction
15 commit/2, % +Connection, +Transaction
16 abort/2, % +Connection, +Transaction
17 disconnect/2 % +Connection, +Headers
18 ]).
32:- use_module(library(uuid)). 33:- use_module(library(socket)). 34:- use_module(library(apply)). 35:- use_module(library(http/json)). 36 37:- dynamic 38 connection_mapping/2.
CallbackDict
is provided, it will be associated
with the connection reference. Valid keys of the dict
are:
on_connected on_disconnected on_message on_heartbeat_timeout on_error
When registering callbacks, both module name and predicate name shall be provided in the format of module:predicate. Valid predicate signatures for example could be:
example:on_connected_handler(Connection, Headers, Body) example:on_disconnected_handler(Connection) example:on_message_handler(Connection, Headers, Body) example:on_heartbeat_timeout_handler(Connection) example:on_error_handler(Connection, Headers, Body)
66connection(Address, Connection) :- 67 connection(Address, Connection, _{}). 68 69connection(Address, CallbackDict, Connection) :- 70 uuid(Connection), 71 asserta(connection_mapping(Connection, 72 _{ 73 address: Address, 74 callbacks: CallbackDict 75 })).
82setup(Connection) :-
83 get_mapping_data(Connection, address, Address),
84 tcp_connect(Address, Stream, []),
85 set_stream(Stream, buffer_size(4096)),
86 thread_create(receive(Connection, Stream), ReceiverThreadId, []),
87 update_connection_mapping(Connection, _{receiver_thread_id: ReceiverThreadId, stream:Stream}).
94teardown(Connection) :-
95 get_mapping_data(Connection, receiver_thread_id, ReceiverThreadId),
96 ( \+ thread_self(ReceiverThreadId),
97 thread_property(ReceiverThreadId, status(running))
98 -> debug(stompl, 'attempting to kill receive thread ~w', [ReceiverThreadId]),
99 thread_signal(ReceiverThreadId, throw(kill))
100 ; true
101 ),
102 ( get_mapping_data(Connection, heartbeat_thread_id, HeartbeatThreadId)
103 -> ( thread_property(HeartbeatThreadId, status(running))
104 -> debug(stompl, 'attempting to kill heartbeat thread ~w', [HeartbeatThreadId]),
105 thread_signal(HeartbeatThreadId, throw(kill))
106 )
107 ),
108 get_mapping_data(Connection, stream, Stream),
109 catch(close(Stream), _, true),
110 debug(stompl, 'retract connection mapping', []), 111 retract(connection_mapping(Connection, _))
111. 112
CONNECT
frame. Protocol version and heartbeat
negotiation will be handled. STOMP
frame is not used
for backward compatibility.
See here.
120connect(Connection, Host, Headers) :-
121 create_frame('CONNECT',
122 Headers.put(_{
123 'accept-version':'1.0,1.1',
124 host:Host 125 }
125), 126 '', Frame)
126, 127 ( Heartbeat = Headers.get('heart-beat') 128 -> update_connection_mapping(Connection, _{'heart-beat':Heartbeat}) 129 ; true 130 ), 131 send0(Connection, Frame)
131. 132
SEND
frame. If content-type
is not provided, text/plain
will be used. content-length
will be filled in automatically.
See here.140send(Connection, Destination, Headers) :- 141 send(Connection, Destination, Headers, ''). 142 143send(Connection, Destination, Headers, Body) :- 144 create_frame('SEND', Headers.put(destination, Destination), Body, Frame), 145 send0(Connection, Frame).
SEND
frame. JSON
can be either a JSON term or a dict.
content-type
is filled in automatically as application/json
and content-length
will be filled in automatically as well.
See here.
154send_json(Connection, Destination, Headers, JSON) :-
155 atom_json_term(Body, JSON, [as(atom)]),
156 create_frame('SEND',
157 Headers.put(_{
158 destination:Destination,
159 'content-type':'application/json'
160 }),
161 Body, Frame),
162 send0(Connection, Frame).
SUBSCRIBE
frame.
See here.
169subscribe(Connection, Destination, Id, Headers) :-
170 create_frame('SUBSCRIBE', Headers.put(_{destination:Destination, id:Id}), '', Frame),
171 send0(Connection, Frame).
UNSUBSCRIBE
frame.
See here.
178unsubscribe(Connection, Id) :-
179 create_frame('UNSUBSCRIBE', _{id:Id}, '', Frame),
180 send0(Connection, Frame).
ACK
frame.
See here.
187ack(Connection, MessageId, Headers) :-
188 create_frame('ACK', Headers.put('message-id', MessageId), '', Frame),
189 send0(Connection, Frame).
NACK
frame.
See here.
196nack(Connection, MessageId, Headers) :-
197 create_frame('NACK', Headers.put('message-id', MessageId), '', Frame),
198 send0(Connection, Frame).
BEGIN
frame.
See here.
205begin(Connection, Transaction) :-
206 create_frame('BEGIN', _{transaction:Transaction}, '', Frame),
207 send0(Connection, Frame).
COMMIT
frame.
See here.
214commit(Connection, Transaction) :-
215 create_frame('COMMIT', _{transaction:Transaction}, '', Frame),
216 send0(Connection, Frame).
ABORT
frame.
See here.
223abort(Connection, Transaction) :-
224 create_frame('ABORT', _{transaction:Transaction}, '', Frame),
225 send0(Connection, Frame).
DISCONNECT
frame.
See here.232disconnect(Connection, Headers) :- 233 create_frame('DISCONNECT', Headers, '', Frame), 234 send0(Connection, Frame). 235 236send0(Connection, Frame) :- 237 send0(Connection, Frame, true). 238 239send0(Connection, Frame, EndWithNull) :- 240 ( 241 -> atom_concat(Frame, '\x00', Frame1) 242 ; Frame1 = Frame 243 ), 244 debug(stompl, 'frame to send~n~w', [Frame1]), 245 get_mapping_data(Connection, stream, Stream), 246 format(Stream, '~w', [Frame1]), 247 flush_output(Stream). 248 249create_frame(Command, Headers, Body, Frame) :- 250 ( Body \= '' 251 -> atom_length(Body, Length), 252 atom_number(Length1, Length), 253 Headers1 = Headers.put('content-length', Length1), 254 ( \+ _ = Headers1.get('content-type') 255 -> Headers2 = Headers1.put('content-type', 'text/plain') 256 ; Headers2 = Headers1 257 ) 258 ; Headers2 = Headers 259 ), 260 create_header_lines(Headers2, HeaderLines), 261 ( HeaderLines \= '' 262 -> atomic_list_concat([Command, HeaderLines], '\n', WithoutBody) 263 ; WithoutBody = Command 264 ), 265 atomic_list_concat([WithoutBody, Body], '\n\n', Frame). 266 267create_header_lines(Headers, HeaderLines) :- 268 dict_pairs(Headers, _, Pairs), 269 maplist(create_header_line, Pairs, HeaderLines0), 270 atomic_list_concat(HeaderLines0, '\n', HeaderLines). 271 272create_header_line(K-V, HeaderLine) :- 273 atomic_list_concat([K, V], ':', HeaderLine). 274 275receive(Connection, Stream) :- 276 receive0(Connection, Stream, ''). 277 278receive0(Connection, Stream, Buffered) :- 279 ( catch(receive_frames(Stream, Frames, Buffered, Buffered1), E, true) 280 -> ( nonvar(E) 281 -> E = exception(disconnected), 282 debug(stompl, 'disconnected', []), 283 notify(Connection, disconnected) 284 ; debug(stompl, 'frames received~n~w', [Frames]), 285 handle_frames(Connection, Frames), 286 debug(stompl, 'frames handled', []), 287 receive0(Connection, Stream, Buffered1) 288 ) 289 ). 290 291receive_frames(Stream, Frames, Buffered0, Buffered) :- 292 ( at_end_of_stream(Stream) 293 -> throw(exception(disconnected)) 294 ; read_pending_input(Stream, Codes, []) 295 ), 296 atom_codes(Chars, Codes), 297 debug(stompl, 'received~n~w', [Chars]), 298 ( Chars = '\x0a' 299 -> Buffered = Buffered0, 300 Frames = [Chars] 301 ; atom_concat(Buffered0, Chars, Buffered1), 302 debug(stompl, 'current buffer~n~w', [Buffered1]), 303 extract_frames(Frames, Buffered1, Buffered) 304 ). 305 306extract_frames(Frames, Buffered0, Buffered) :- 307 extract_frames0([], Frames0, Buffered0, Buffered), 308 reverse(Frames0, Frames). 309 310extract_frames0(Frames, Frames, '', '') :- !. 311extract_frames0(Frames, Frames, Buffered, Buffered) :- 312 \+ sub_atom(Buffered, _, 1, _, '\x00'), !. 313extract_frames0(Frames0, Frames, Buffered0, Buffered) :- 314 sub_atom(Buffered0, FrameLength, 1, _, '\x00'), !, 315 sub_atom(Buffered0, 0, FrameLength, _, Frame), 316 ( sub_atom(Frame, PreambleLength, 2, _, '\n\n'), ! 317 -> ( check_frame(Frame, Buffered0, FrameLength, PreambleLength, Frame1, Buffered1) 318 -> Frames1 = [Frame1|Frames0], 319 extract_frames0(Frames1, Frames, Buffered1, Buffered) 320 ; Frames = Frames0, 321 Buffered = Buffered0 322 ) 323 ; Frames1 = [Frame|Frames0], 324 Length is FrameLength + 1, 325 sub_atom(Buffered0, Length, _, 0, Buffered1), 326 extract_frames0(Frames1, Frames, Buffered1, Buffered) 327 ). 328 329check_frame(Frame0, Buffered0, FrameLength, PreambleLength, Frame, Buffered) :- 330 ( read_content_length(Frame0, ContentLength) 331 -> ContentOffset is PreambleLength + 2, 332 FrameSize is ContentOffset + ContentLength, 333 ( FrameSize > FrameLength 334 -> atom_length(Buffered0, Length), 335 ( FrameSize < Length 336 -> sub_atom(Buffered0, 0, FrameSize, _, Frame), 337 Length is FrameSize + 1, 338 sub_atom(Buffered0, Length, _, 0, Buffered) 339 ) 340 ; Frame = Frame0, 341 Length is FrameLength + 1, 342 sub_atom(Buffered0, Length, _, 0, Buffered) 343 ) 344 ; Frame = Frame0, 345 Length is FrameLength + 1, 346 sub_atom(Buffered0, Length, _, 0, Buffered) 347 ). 348 349read_content_length(Frame, Length) :- 350 atomic_list_concat([_, Frame1], 'content-length:', Frame), 351 atomic_list_concat([Length0|_], '\n', Frame1), 352 atomic_list_concat(L, ' ', Length0), 353 last(L, Length1), 354 atom_number(Length1, Length). 355 356handle_frames(_, []) :- !. 357handle_frames(Connection, [H|T]) :- 358 parse_frame(H, ParsedFrame), 359 debug(stompl, 'parsed frame~n~w', [ParsedFrame]), 360 process_frame(Connection, ParsedFrame), 361 handle_frames(Connection, T). 362 363parse_frame('\x0a', _{cmd:heartbeat}) :- !. 364parse_frame(Frame, ParsedFrame) :- 365 sub_atom(Frame, PreambleLength, 2, _, '\n\n'), !, 366 sub_atom(Frame, 0, PreambleLength, _, Preamble), 367 Begin is PreambleLength + 2, 368 sub_atom(Frame, Begin, _, 0, Body), 369 parse_headers(Preamble, Command, Headers), 370 ParsedFrame = _{cmd:Command, headers:Headers, body:Body}. 371 372parse_headers(Preamble, Command, Headers) :- 373 atomic_list_concat([Command|PreambleLines], '\n', Preamble), 374 parse_headers0(PreambleLines, _{}, Headers). 375 376parse_headers0([], Headers, Headers) :- !. 377parse_headers0([H|T], Headers0, Headers) :- 378 atomic_list_concat([Key0, Value0], ':', H), 379 replace(Key0, Key), 380 ( \+ Headers0.get(Key)
381 -> sub_atom(Value0, _, _, 0, Value1), 382 \+ sub_atom(Value1, 0, 1, _, ' '), !, 383 replace(Value1, Value), 384 Headers1 = Headers0.put(Key, Value) 385 ; Headers1 = Headers0 386 )
386, 387 parse_headers0(T, Headers1, Headers)
387. 388 389replace(A0, A) :- 390 atomic_list_concat(L0, '\\n', A0), 391 atomic_list_concat(L0, '\n', A1), 392 atomic_list_concat(L1, '\\r', A1), 393 atomic_list_concat(L1, '\r', A2), 394 atomic_list_concat(L2, '\\\\', A2), 395 atomic_list_concat(L2, '\\', A3), 396 atomic_list_concat(L3, '\\c', A3), 397 atomic_list_concat(L3, ':', A). 398 399process_frame(Connection, Frame) :- 400 Frame.cmd = heartbeat, !, 401 get_time(Now), 402 debug(stompl, 'received heartbeat at ~w', [Now]), 403 update_connection_mapping(Connection, _{received_heartbeat:Now}). 404process_frame(Connection, Frame) :- 405 downcase_atom(Frame.cmd, FrameType), 406 ( FrameType = connected 407 -> start_heartbeat_if_required(Connection, Frame.headers) 408 ; true 409 ), 410 notify(Connection, FrameType, Frame). 411 412start_heartbeat_if_required(Connection, Headers) :- 413 ( get_mapping_data(Connection, 'heart-beat', CHB), 414 SHB = Headers.get('heart-beat') 415 -> start_heartbeat(Connection, CHB, SHB) 416 ; true 417 ). 418 419start_heartbeat(Connection, CHB, SHB) :- 420 extract_heartbeats(CHB, CX, CY), 421 extract_heartbeats(SHB, SX, SY), 422 calculate_heartbeats(CX-CY, SX-SY, X-Y), 423 X-Y \= 0-0, !, 424 debug(stompl, 'calculated heartbeats are ~w,~w', [X, Y]), 425 SendSleep is X / 1000, 426 ReceiveSleep is Y / 1000 + 2, 427 ( SendSleep = 0 428 -> SleepTime = ReceiveSleep 429 ; ( ReceiveSleep = 0 430 -> SleepTime = SendSleep 431 ; SleepTime is gcd(SendSleep, ReceiveSleep) / 2 432 ) 433 ), 434 get_time(Now), 435 thread_create(heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, Now), 436 HeartbeatThreadId, []), 437 update_connection_mapping(Connection, 438 _{ 439 heartbeat_thread_id:HeartbeatThreadId, 440 received_heartbeat:Now 441 }). 442start_heartbeat(_, _, _). 443 444extract_heartbeats(Heartbeat, X, Y) :- 445 atomic_list_concat(L, ' ', Heartbeat), 446 atomic_list_concat(L, '', Heartbeat1), 447 atomic_list_concat([X0, Y0], ',', Heartbeat1), 448 atom_number(X0, X), 449 atom_number(Y0, Y). 450 451calculate_heartbeats(CX-CY, SX-SY, X-Y) :- 452 ( CX \= 0, SY \= 0 453 -> X is max(CX, floor(SY)) 454 ; X = 0 455 ), 456 ( CY \= 0, SX \= 0 457 -> Y is max(CY, floor(SX)) 458 ; Y = 0 459 ). 460 461heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, SendTime) :- 462 sleep(SleepTime), 463 get_time(Now), 464 ( Now - SendTime > SendSleep 465 -> SendTime1 = Now, 466 debug(stompl, 'sending a heartbeat message at ~w', [Now]), 467 send0(Connection, '\x0a', false) 468 ; SendTime1 = SendTime 469 ), 470 get_mapping_data(Connection, received_heartbeat, ReceivedHeartbeat), 471 DiffReceive is Now - ReceivedHeartbeat, 472 ( DiffReceive > ReceiveSleep 473 -> debug(stompl, 474 'heartbeat timeout: diff_receive=~w, time=~w, lastrec=~w', 475 [DiffReceive, Now, ReceivedHeartbeat]), 476 notify(Connection, heartbeat_timeout) 477 ; true 478 ), 479 heartbeat_loop(Connection, SendSleep, ReceiveSleep, SleepTime, SendTime1). 480 481notify(Connection, FrameType) :- 482 get_mapping_data(Connection, callbacks, CallbackDict), 483 atom_concat(on_, FrameType, Key), 484 ( Predicate = CallbackDict.get(Key) 485 -> debug(stompl, 'callback predicate ~w', [Predicate]), 486 ignore(call(Predicate, Connection)) 487 ; true 488 ). 489 490notify(Connection, FrameType, Frame) :- 491 get_mapping_data(Connection, callbacks, CallbackDict), 492 atom_concat(on_, FrameType, Key), 493 ( Predicate = CallbackDict.get(Key) 494 -> debug(stompl, 'callback predicate ~w', [Predicate]), 495 ignore(call(Predicate, Connection, Frame.headers, Frame.body)) 496 ; true 497 ). 498 499get_mapping_data(Connection, Key, Value) :- 500 connection_mapping(Connection, Dict), 501 Value = Dict.get(Key). 502 503update_connection_mapping(Connection, Dict) :- 504 connection_mapping(Connection, OldDict), 505 retract(connection_mapping(Connection, OldDict)), 506 asserta(connection_mapping(Connection, OldDict.put(Dict)))
STOMP client.
A STOMP 1.0 and 1.1 compatible client.
stomp.py is used as a reference for the implementation.