37
38:- module(thread_httpd,
39 [ http_current_server/2, 40 http_server_property/2, 41 http_server/2, 42 http_workers/2, 43 http_add_worker/2, 44 http_current_worker/2, 45 http_stop_server/2, 46 http_spawn/2, 47
48 http_requeue/1, 49 http_close_connection/1, 50 http_enough_workers/3 51 ]). 52:- use_module(library(debug)). 53:- use_module(library(error)). 54:- use_module(library(option)). 55:- use_module(library(socket)). 56:- use_module(library(thread_pool)). 57:- use_module(library(gensym)). 58:- use_module(http_wrapper). 59:- use_module(http_path). 60:- use_module(http_stream). 61
62:- autoload(library(uri), [uri_resolve/3]). 63:- autoload(library(aggregate), [aggregate_all/3]). 64
65:- predicate_options(http_server/2, 2,
66 [ port(any),
67 unix_socket(atom),
68 entry_page(atom),
69 tcp_socket(any),
70 workers(positive_integer),
71 timeout(number),
72 keep_alive_timeout(number),
73 silent(boolean),
74 ssl(list(any)), 75 pass_to(system:thread_create/3, 3)
76 ]). 77:- predicate_options(http_spawn/2, 2,
78 [ pool(atom),
79 pass_to(system:thread_create/3, 3),
80 pass_to(thread_pool:thread_create_in_pool/4, 4)
81 ]). 82:- predicate_options(http_add_worker/2, 2,
83 [ timeout(number),
84 keep_alive_timeout(number),
85 max_idle_time(number),
86 pass_to(system:thread_create/3, 3)
87 ]). 88
114
115:- meta_predicate
116 http_server(1, :),
117 http_current_server(1, ?),
118 http_spawn(0, +). 119
120:- dynamic
121 current_server/6, 122 queue_worker/2, 123 queue_options/2. 124
125:- multifile
126 make_socket_hook/3,
127 accept_hook/2,
128 close_hook/1,
129 open_client_hook/6,
130 discard_client_hook/1,
131 http:create_pool/1,
132 http:schedule_workers/1. 133
134:- meta_predicate
135 thread_repeat_wait(0). 136
200
201http_server(Goal, M:Options0) :-
202 server_address(Address, Options0),
203 !,
204 make_socket(Address, M:Options0, Options),
205 create_workers(Options),
206 create_server(Goal, Address, Options),
207 ( option(silent(true), Options0)
208 -> true
209 ; print_message(informational,
210 httpd_started_server(Address, Options0))
211 ).
212http_server(_Goal, _:Options0) :-
213 existence_error(server_address, Options0).
214
215server_address(Address, Options) :-
216 ( option(port(Port), Options)
217 -> Address = Port
218 ; option(unix_socket(Path), Options)
219 -> Address = unix_socket(Path)
220 ).
221
222address_port(_IFace:Port, Port) :- !.
223address_port(unix_socket(Path), Path) :- !.
224address_port(Address, Address) :- !.
225
226tcp_address(Port) :-
227 var(Port),
228 !.
229tcp_address(Port) :-
230 integer(Port),
231 !.
232tcp_address(_Iface:_Port).
233
234address_domain(localhost:_Port, Domain) =>
235 Domain = inet.
236address_domain(Iface:_Port, Domain) =>
237 ( catch(ip_name(IP, Iface), error(_,_), fail),
238 functor(IP, ip, 8)
239 -> Domain = inet6
240 ; Domain = inet
241 ).
242address_domain(_, Domain) =>
243 Domain = inet.
244
245
253
254make_socket(Address, M:Options0, Options) :-
255 tcp_address(Address),
256 make_socket_hook(Address, M:Options0, Options),
257 !.
258make_socket(Address, _:Options0, Options) :-
259 option(tcp_socket(_), Options0),
260 !,
261 make_addr_atom('httpd', Address, Queue),
262 Options = [ queue(Queue)
263 | Options0
264 ].
265make_socket(Address, _:Options0, Options) :-
266 tcp_address(Address),
267 !,
268 address_domain(Address, Domain),
269 socket_create(Socket, [domain(Domain)]),
270 tcp_setopt(Socket, reuseaddr),
271 tcp_bind(Socket, Address),
272 tcp_listen(Socket, 64),
273 make_addr_atom('httpd', Address, Queue),
274 Options = [ queue(Queue),
275 tcp_socket(Socket)
276 | Options0
277 ].
278:- if(current_predicate(unix_domain_socket/1)). 279make_socket(Address, _:Options0, Options) :-
280 Address = unix_socket(Path),
281 !,
282 unix_domain_socket(Socket),
283 tcp_bind(Socket, Path),
284 tcp_listen(Socket, 64),
285 make_addr_atom('httpd', Address, Queue),
286 Options = [ queue(Queue),
287 tcp_socket(Socket)
288 | Options0
289 ].
290:- endif. 291
296
297make_addr_atom(Scheme, Address, Atom) :-
298 phrase(address_parts(Address), Parts),
299 atomic_list_concat([Scheme,@|Parts], Atom).
300
301address_parts(Var) -->
302 { var(Var),
303 !,
304 instantiation_error(Var)
305 }.
306address_parts(Atomic) -->
307 { atomic(Atomic) },
308 !,
309 [Atomic].
310address_parts(Host:Port) -->
311 !,
312 address_parts(Host), [:], address_parts(Port).
313address_parts(ip(A,B,C,D)) -->
314 !,
315 [ A, '.', B, '.', C, '.', D ].
316address_parts(unix_socket(Path)) -->
317 [Path].
318address_parts(Address) -->
319 { domain_error(http_server_address, Address) }.
320
321
326
327create_server(Goal, Address, Options) :-
328 get_time(StartTime),
329 memberchk(queue(Queue), Options),
330 scheme(Scheme, Options),
331 autoload_https(Scheme),
332 address_port(Address, Port),
333 make_addr_atom(Scheme, Port, Alias),
334 thread_self(Initiator),
335 thread_create(accept_server(Goal, Initiator, Options), _,
336 [ alias(Alias)
337 ]),
338 thread_get_message(server_started),
339 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
340
341scheme(Scheme, Options) :-
342 option(scheme(Scheme), Options),
343 !.
344scheme(Scheme, Options) :-
345 ( option(ssl(_), Options)
346 ; option(ssl_instance(_), Options)
347 ),
348 !,
349 Scheme = https.
350scheme(http, _).
351
352autoload_https(https) :-
353 \+ clause(accept_hook(_Goal, _Options), _),
354 exists_source(library(http/http_ssl_plugin)),
355 !,
356 use_module(library(http/http_ssl_plugin)).
357autoload_https(_).
358
364
365http_current_server(Goal, Port) :-
366 current_server(Port, Goal, _, _, _, _).
367
368
381
382http_server_property(_:Port, Property) :-
383 integer(Port),
384 !,
385 server_property(Property, Port).
386http_server_property(Port, Property) :-
387 server_property(Property, Port).
388
389server_property(goal(Goal), Port) :-
390 current_server(Port, Goal, _, _, _, _).
391server_property(scheme(Scheme), Port) :-
392 current_server(Port, _, _, _, Scheme, _).
393server_property(start_time(Time), Port) :-
394 current_server(Port, _, _, _, _, Time).
395
396
406
407http_workers(Port, Workers) :-
408 integer(Workers),
409 !,
410 must_be(ground, Port),
411 ( current_server(Port, _, _, Queue, _, _)
412 -> resize_pool(Queue, Workers)
413 ; existence_error(http_server, Port)
414 ).
415http_workers(Port, Workers) :-
416 current_server(Port, _, _, Queue, _, _),
417 aggregate_all(count, queue_worker(Queue, _Worker), Workers).
418
419
429
430http_add_worker(Port, Options) :-
431 must_be(ground, Port),
432 current_server(Port, _, _, Queue, _, _),
433 !,
434 queue_options(Queue, QueueOptions),
435 merge_options(Options, QueueOptions, WorkerOptions),
436 atom_concat(Queue, '_', AliasBase),
437 create_workers(1, 1, Queue, AliasBase, WorkerOptions).
438http_add_worker(Port, _) :-
439 existence_error(http_server, Port).
440
441
448
449http_current_worker(Port, ThreadID) :-
450 current_server(Port, _, _, Queue, _, _),
451 queue_worker(Queue, ThreadID).
452
453
458
459accept_server(Goal, Initiator, Options) :-
460 Ex = http_stop(Stopper),
461 catch(accept_server2(Goal, Initiator, Options), Ex, true),
462 thread_self(Thread),
463 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]),
464 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
465 close_pending_accepts(Queue),
466 close_server_socket(Options),
467 thread_send_message(Stopper, http_stopped).
468
469accept_server2(Goal, Initiator, Options) :-
470 thread_send_message(Initiator, server_started),
471 repeat,
472 ( catch(accept_server3(Goal, Options), E, true)
473 -> ( var(E)
474 -> fail
475 ; accept_rethrow_error(E)
476 -> throw(E)
477 ; print_message(error, E),
478 fail
479 )
480 ; print_message(error, 481 goal_failed(accept_server3(Goal, Options))),
482 fail
483 ).
484
485accept_server3(Goal, Options) :-
486 accept_hook(Goal, Options),
487 !.
488accept_server3(Goal, Options) :-
489 memberchk(tcp_socket(Socket), Options),
490 memberchk(queue(Queue), Options),
491 debug(http(connection), 'Waiting for connection', []),
492 tcp_accept(Socket, Client, Peer),
493 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
494 http_enough_workers(Queue, accept, Peer).
495
496send_to_worker(Queue, Client, Goal, Peer) :-
497 debug(http(connection), 'New HTTP connection from ~p', [Peer]),
498 thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
499
500accept_rethrow_error(http_stop(_)).
501
505
506close_server_socket(Options) :-
507 close_hook(Options),
508 !.
509close_server_socket(Options) :-
510 memberchk(tcp_socket(Socket), Options),
511 !,
512 tcp_close_socket(Socket).
513
515
516close_pending_accepts(Queue) :-
517 ( thread_get_message(Queue, Msg, [timeout(0)])
518 -> close_client(Msg),
519 close_pending_accepts(Queue)
520 ; true
521 ).
522
523close_client(tcp_client(Client, _Goal, _0Peer)) =>
524 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
525 tcp_close_socket(Client).
526close_client(Msg) =>
527 ( discard_client_hook(Msg)
528 -> true
529 ; print_message(warning, http_close_client(Msg))
530 ).
531
532
539
540http_stop_server(Host:Port, Options) :- 541 ground(Host),
542 !,
543 http_stop_server(Port, Options).
544http_stop_server(Port, _Options) :-
545 http_workers(Port, 0), 546 current_server(Port, _, Thread, Queue, _Scheme, _Start),
547 retractall(queue_options(Queue, _)),
548 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
549 thread_self(Stopper),
550 thread_signal(Thread, throw(http_stop(Stopper))),
551 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)])
552 -> true
553 ; catch(connect(localhost:Port), _, true)
554 ),
555 thread_join(Thread, _0Status),
556 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
557 message_queue_destroy(Queue).
558
559connect(Address) :-
560 setup_call_cleanup(
561 tcp_socket(Socket),
562 tcp_connect(Socket, Address),
563 tcp_close_socket(Socket)).
564
570
571http_enough_workers(Queue, _Why, _Peer) :-
572 message_queue_property(Queue, waiting(_0)),
573 !,
574 debug(http(scheduler), '~D waiting for work; ok', [_0]).
575http_enough_workers(Queue, Why, Peer) :-
576 message_queue_property(Queue, size(Size)),
577 ( enough(Size, Why)
578 -> debug(http(scheduler), '~D in queue; ok', [Size])
579 ; current_server(Port, _, _, Queue, _, _),
580 Data = _{ port:Port,
581 reason:Why,
582 peer:Peer,
583 waiting:Size,
584 queue:Queue
585 },
586 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
587 catch(http:schedule_workers(Data),
588 Error,
589 print_message(error, Error))
590 -> true
591 ; true
592 ).
593
594enough(0, _).
595enough(1, keep_alive). 596
597
623
624
625 628
633
634create_workers(Options) :-
635 option(workers(N), Options, 5),
636 option(queue(Queue), Options),
637 catch(message_queue_create(Queue), _, true),
638 atom_concat(Queue, '_', AliasBase),
639 create_workers(1, N, Queue, AliasBase, Options),
640 assert(queue_options(Queue, Options)).
641
642create_workers(I, N, _, _, _) :-
643 I > N,
644 !.
645create_workers(I, N, Queue, AliasBase, Options) :-
646 gensym(AliasBase, Alias),
647 thread_create(http_worker(Options), Id,
648 [ alias(Alias)
649 | Options
650 ]),
651 assertz(queue_worker(Queue, Id)),
652 I2 is I + 1,
653 create_workers(I2, N, Queue, AliasBase, Options).
654
655
660
661resize_pool(Queue, Size) :-
662 findall(W, queue_worker(Queue, W), Workers),
663 length(Workers, Now),
664 ( Now < Size
665 -> queue_options(Queue, Options),
666 atom_concat(Queue, '_', AliasBase),
667 I0 is Now+1,
668 create_workers(I0, Size, Queue, AliasBase, Options)
669 ; Now == Size
670 -> true
671 ; Now > Size
672 -> Excess is Now - Size,
673 thread_self(Me),
674 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
675 forall(between(1, Excess, _), thread_get_message(quitted(_)))
676 ).
677
678
686
687http_worker(Options) :-
688 debug(http(scheduler), 'New worker', []),
689 prolog_listen(this_thread_exit, done_worker),
690 option(queue(Queue), Options),
691 option(max_idle_time(MaxIdle), Options, infinite),
692 thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
693 debug(http(worker), 'Waiting for a job ...', []),
694 debug(http(worker), 'Got job ~p', [Message]),
695 ( Message = quit(Sender)
696 -> !,
697 thread_self(Self),
698 thread_detach(Self),
699 ( Sender == idle
700 -> true
701 ; retract(queue_worker(Queue, Self)),
702 thread_send_message(Sender, quitted(Self))
703 )
704 ; open_client(Message, Queue, Goal, In, Out,
705 Options, ClientOptions),
706 ( catch(http_process(Goal, In, Out, ClientOptions),
707 Error, true)
708 -> true
709 ; Error = goal_failed(http_process/4)
710 ),
711 ( var(Error)
712 -> fail
713 ; current_message_level(Error, Level),
714 print_message(Level, Error),
715 memberchk(peer(Peer), ClientOptions),
716 close_connection(Peer, In, Out),
717 fail
718 )
719 ).
720
721get_work(Queue, Message, infinite) :-
722 !,
723 thread_get_message(Queue, Message).
724get_work(Queue, Message, MaxIdle) :-
725 ( thread_get_message(Queue, Message, [timeout(MaxIdle)])
726 -> true
727 ; Message = quit(idle)
728 ).
729
730
736
737open_client(requeue(In, Out, Goal, ClOpts),
738 _, Goal, In, Out, Opts, ClOpts) :-
739 !,
740 memberchk(peer(Peer), ClOpts),
741 option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
742 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
743open_client(Message, Queue, Goal, In, Out, Opts,
744 [ pool(client(Queue, Goal, In, Out)),
745 timeout(Timeout)
746 | Options
747 ]) :-
748 catch(open_client(Message, Goal, In, Out, Options, Opts),
749 E, report_error(E)),
750 option(timeout(Timeout), Opts, 60),
751 ( debugging(http(connection))
752 -> memberchk(peer(Peer), Options),
753 debug(http(connection), 'Opened connection from ~p', [Peer])
754 ; true
755 ).
756
757
760
761open_client(Message, Goal, In, Out, ClientOptions, Options) :-
762 open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
763 !.
764open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
765 [ peer(Peer),
766 protocol(http)
767 ], _) :-
768 tcp_open_socket(Socket, In, Out).
769
770report_error(E) :-
771 print_message(error, E),
772 fail.
773
774
780
781check_keep_alive_connection(In, TMO, Peer, In, Out) :-
782 stream_property(In, timeout(Old)),
783 set_stream(In, timeout(TMO)),
784 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
785 catch(peek_code(In, Code), E, true),
786 ( var(E), 787 Code \== -1 788 -> set_stream(In, timeout(Old)),
789 debug(http(keep_alive), '\tre-using keep-alive connection', [])
790 ; ( Code == -1
791 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
792 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
793 ),
794 close_connection(Peer, In, Out),
795 fail
796 ).
797
798
804
805done_worker :-
806 thread_self(Self),
807 thread_detach(Self),
808 retract(queue_worker(Queue, Self)),
809 thread_property(Self, status(Status)),
810 !,
811 ( catch(recreate_worker(Status, Queue), _, fail)
812 -> print_message(informational,
813 httpd_restarted_worker(Self))
814 ; done_status_message_level(Status, Level),
815 print_message(Level,
816 httpd_stopped_worker(Self, Status))
817 ).
818done_worker :- 819 thread_self(Self),
820 thread_property(Self, status(Status)),
821 done_status_message_level(Status, Level),
822 print_message(Level,
823 httpd_stopped_worker(Self, Status)).
824
825done_status_message_level(true, silent) :- !.
826done_status_message_level(exception('$aborted'), silent) :- !.
827done_status_message_level(exception(unwind(abort)), silent) :- !.
828done_status_message_level(exception(unwind(halt(_))), silent) :- !.
829done_status_message_level(_, informational).
830
831
843
844recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
845 halt(2).
846recreate_worker(exception(Error), Queue) :-
847 recreate_on_error(Error),
848 queue_options(Queue, Options),
849 atom_concat(Queue, '_', AliasBase),
850 create_workers(1, 1, Queue, AliasBase, Options).
851
852recreate_on_error('$aborted').
853recreate_on_error(unwind(abort)).
854recreate_on_error(time_limit_exceeded).
855
862
863:- multifile
864 message_level/2. 865
866message_level(error(io_error(read, _), _), silent).
867message_level(error(socket_error(epipe,_), _), silent).
868message_level(error(http_write_short(_Obj,_Written), _), silent).
869message_level(error(timeout_error(read, _), _), informational).
870message_level(keep_alive_timeout, silent).
871
872current_message_level(Term, Level) :-
873 ( message_level(Term, Level)
874 -> true
875 ; Level = error
876 ).
877
882
883read_remaining_request(StartBody, Request) :-
884 memberchk(content_length(Len), Request),
885 !,
886 memberchk(pool(client(_Queue, _Goal, In, _Out)), Request),
887 byte_count(In, Here),
888 Left is StartBody+Len-Here,
889 read_incomplete(In, Left).
890read_remaining_request(_, _Request).
891
892read_incomplete(_, 0) :-
893 !.
894read_incomplete(In, Left) :-
895 896 catch(setup_call_cleanup(
897 open_null_stream(Null),
898 copy_stream_data(In, Null, Left),
899 close(Null)),
900 error(_,_),
901 fail).
902
907
908http_requeue(Header) :-
909 requeue_header(Header, ClientOptions),
910 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
911 memberchk(peer(Peer), ClientOptions),
912 http_enough_workers(Queue, keep_alive, Peer),
913 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
914 !.
915http_requeue(Header) :-
916 debug(http(error), 'Re-queue failed: ~p', [Header]),
917 fail.
918
([], []).
920requeue_header([H|T0], [H|T]) :-
921 requeue_keep(H),
922 !,
923 requeue_header(T0, T).
924requeue_header([_|T0], T) :-
925 requeue_header(T0, T).
926
927requeue_keep(pool(_)).
928requeue_keep(peer(_)).
929requeue_keep(protocol(_)).
930
931
935
936http_process(Goal, In, Out, Options) :-
937 debug(http(server), 'Running server goal ~p on ~p -> ~p',
938 [Goal, In, Out]),
939 option(timeout(TMO), Options, 60),
940 set_stream(In, timeout(TMO)),
941 set_stream(Out, timeout(TMO)),
942 http_wrapper(Goal, In, Out, Connection,
943 [ request(Request),
944 byte_count(StartBody)
945 | Options
946 ]),
947 next(Connection, StartBody, Request).
948
949next(Connection, StartBody, Request) :-
950 next_(Connection, StartBody, Request), !.
951next(Connection, StartBody, Request) :-
952 print_message(warning, goal_failed(next(Connection,StartBody,Request))).
953
954next_(switch_protocol(SwitchGoal, _SwitchOptions), _, Request) :-
955 !,
956 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
957 ( catch(call(SwitchGoal, In, Out), E,
958 ( print_message(error, E),
959 fail))
960 -> true
961 ; http_close_connection(Request)
962 ).
963next_(spawned(ThreadId), _, _) :-
964 !,
965 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
966next_(Connection, StartBody, Request) :-
967 downcase_atom(Connection, 'keep-alive'),
968 read_remaining_request(StartBody, Request),
969 http_requeue(Request),
970 !.
971next_(_, _, Request) :-
972 http_close_connection(Request).
973
974
978
979http_close_connection(Request) :-
980 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
981 memberchk(peer(Peer), Request),
982 close_connection(Peer, In, Out).
983
988
989close_connection(Peer, In, Out) :-
990 debug(http(connection), 'Closing connection from ~p', [Peer]),
991 catch(close(In, [force(true)]), _, true),
992 catch(close(Out, [force(true)]), _, true).
993
1009
1010http_spawn(Goal, Options) :-
1011 select_option(pool(Pool), Options, ThreadOptions),
1012 !,
1013 current_output(CGI),
1014 Error = error(Formal, _),
1015 catch(thread_create_in_pool(Pool,
1016 wrap_spawned(CGI, Goal), Id,
1017 [ detached(true)
1018 | ThreadOptions
1019 ]),
1020 Error,
1021 true),
1022 ( var(Formal)
1023 -> http_spawned(Id)
1024 ; Formal = resource_error(threads_in_pool(_))
1025 -> throw(http_reply(busy))
1026 ; Formal = existence_error(thread_pool, Pool),
1027 create_pool(Pool)
1028 -> http_spawn(Goal, Options)
1029 ; throw(Error)
1030 ).
1031http_spawn(Goal, Options) :-
1032 current_output(CGI),
1033 thread_create(wrap_spawned(CGI, Goal), Id,
1034 [ detached(true)
1035 | Options
1036 ]),
1037 http_spawned(Id).
1038
1039wrap_spawned(CGI, Goal) :-
1040 set_output(CGI),
1041 cgi_property(CGI, request(Request)),
1042 memberchk(input(Input), Request),
1043 byte_count(Input, StartBody),
1044 http_wrap_spawned(Goal, Request, Connection),
1045 next(Connection, StartBody, Request).
1046
1054
1055create_pool(Pool) :-
1056 E = error(permission_error(create, thread_pool, Pool), _),
1057 catch(http:create_pool(Pool), E, true).
1058create_pool(Pool) :-
1059 print_message(informational, httpd(created_pool(Pool))),
1060 thread_pool_create(Pool, 10, []).
1061
1062
1063 1066
1067:- meta_predicate
1068 thread_repeat_wait(0). 1069
1074
1075thread_repeat_wait(Goal) :-
1076 new_rate_mma(5, 1000, State),
1077 repeat,
1078 notrace,
1079 update_rate_mma(State, MMA),
1080 long(MMA, IsLong),
1081 ( IsLong == brief
1082 -> call(Goal)
1083 ; thread_idle(Goal, IsLong)
1084 ).
1085
1086long(MMA, brief) :-
1087 MMA < 0.05,
1088 !.
1089long(MMA, short) :-
1090 MMA < 1,
1091 !.
1092long(_, long).
1093
1105
1106new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
1107 current_prolog_flag(max_tagged_integer, MaxI),
1108 get_time(Base).
1109
1110update_rate_mma(State, MMAr) :-
1111 State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
1112 get_time(Now),
1113 Stamp is round((Now-Base)*Resolution),
1114 ( Stamp > MaxI
1115 -> nb_setarg(1, State, Now),
1116 nb_setarg(2, State, 0)
1117 ; true
1118 ),
1119 Diff is Stamp-Last,
1120 nb_setarg(2, State, Stamp),
1121 MMA is round(((N-1)*MMA0+Diff)/N),
1122 nb_setarg(6, State, MMA),
1123 MMAr is MMA/float(Resolution).
1124
1125
1126 1129
1130:- multifile
1131 prolog:message/3. 1132
1133prolog:message(httpd_started_server(Port, Options)) -->
1134 [ 'Started server at '-[] ],
1135 http_root(Port, Options).
1136prolog:message(httpd_stopped_worker(Self, Status)) -->
1137 [ 'Stopped worker ~p: ~p'-[Self, Status] ].
1138prolog:message(httpd_restarted_worker(Self)) -->
1139 [ 'Replaced aborted worker ~p'-[Self] ].
1140prolog:message(httpd(created_pool(Pool))) -->
1141 [ 'Created thread-pool ~p of size 10'-[Pool], nl,
1142 'Create this pool at startup-time or define the hook ', nl,
1143 'http:create_pool/1 to avoid this message and create a ', nl,
1144 'pool that fits the usage-profile.'
1145 ].
1146
1147http_root(Address, Options) -->
1148 { landing_page(Address, URI, Options) },
1149 [ url(URI) ].
1150
1151landing_page(Host:Port, URI, Options) :-
1152 !,
1153 must_be(atom, Host),
1154 must_be(integer, Port),
1155 http_server_property(Port, scheme(Scheme)),
1156 ( default_port(Scheme, Port)
1157 -> format(atom(Base), '~w://~w', [Scheme, Host])
1158 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
1159 ),
1160 entry_page(Base, URI, Options).
1161landing_page(unix_socket(Path), URI, _Options) :-
1162 !,
1163 format(string(URI), 'Unix domain socket "~w"', [Path]).
1164landing_page(Port, URI, Options) :-
1165 landing_page(localhost:Port, URI, Options).
1166
1167default_port(http, 80).
1168default_port(https, 443).
1169
1170entry_page(Base, URI, Options) :-
1171 option(entry_page(Entry), Options),
1172 !,
1173 uri_resolve(Entry, Base, URI).
1174entry_page(Base, URI, _) :-
1175 http_absolute_location(root(.), Entry, []),
1176 uri_resolve(Entry, Base, URI)