1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2024, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 SWI-Prolog Solutions b.v. 10 All rights reserved. 11 12 Redistribution and use in source and binary forms, with or without 13 modification, are permitted provided that the following conditions 14 are met: 15 16 1. Redistributions of source code must retain the above copyright 17 notice, this list of conditions and the following disclaimer. 18 19 2. Redistributions in binary form must reproduce the above copyright 20 notice, this list of conditions and the following disclaimer in 21 the documentation and/or other materials provided with the 22 distribution. 23 24 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 25 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 26 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 27 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 28 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 29 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 30 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 31 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 32 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 33 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 34 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 35 POSSIBILITY OF SUCH DAMAGE. 36*/ 37 38:- module(thread_httpd, 39 [ http_current_server/2, % ?:Goal, ?Port 40 http_server_property/2, % ?Port, ?Property 41 http_server/2, % :Goal, +Options 42 http_workers/2, % +Port, ?WorkerCount 43 http_add_worker/2, % +Port, +Options 44 http_current_worker/2, % ?Port, ?ThreadID 45 http_stop_server/2, % +Port, +Options 46 http_spawn/2, % :Goal, +Options 47 48 http_requeue/1, % +Request 49 http_close_connection/1, % +Request 50 http_enough_workers/3 % +Queue, +Why, +Peer 51 ]). 52:- use_module(library(debug)). 53:- use_module(library(error)). 54:- use_module(library(option)). 55:- use_module(library(socket)). 56:- use_module(library(thread_pool)). 57:- use_module(library(gensym)). 58:- use_module(http_wrapper). 59:- use_module(http_path). 60:- use_module(http_stream). 61 62:- autoload(library(uri), [uri_resolve/3]). 63:- autoload(library(aggregate), [aggregate_all/3]). 64 65:- predicate_options(http_server/2, 2, 66 [ port(any), 67 unix_socket(atom), 68 entry_page(atom), 69 tcp_socket(any), 70 workers(positive_integer), 71 timeout(number), 72 keep_alive_timeout(number), 73 silent(boolean), 74 ssl(list(any)), % if http/http_ssl_plugin is loaded 75 pass_to(system:thread_create/3, 3) 76 ]). 77:- predicate_options(http_spawn/2, 2, 78 [ pool(atom), 79 pass_to(system:thread_create/3, 3), 80 pass_to(thread_pool:thread_create_in_pool/4, 4) 81 ]). 82:- predicate_options(http_add_worker/2, 2, 83 [ timeout(number), 84 keep_alive_timeout(number), 85 max_idle_time(number), 86 pass_to(system:thread_create/3, 3) 87 ]).
115:- meta_predicate 116 http_server( , ), 117 http_current_server( , ), 118 http_spawn( , ). 119 120:- dynamic 121 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 122 queue_worker/2, % Queue, ThreadID 123 queue_options/2. % Queue, Options 124 125:- multifile 126 make_socket_hook/3, 127 accept_hook/2, 128 close_hook/1, 129 open_client_hook/6, 130 discard_client_hook/1, 131 http:create_pool/1, 132 http:schedule_workers/1. 133 134:- meta_predicate 135 thread_repeat_wait( ).
AF_UNIX
). See socket_create/2.main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
201http_server(Goal, M:Options0) :- 202 server_address(Address, Options0), 203 !, 204 make_socket(Address, M:Options0, Options), 205 create_workers(Options), 206 create_server(Goal, Address, Options), 207 ( option(silent(true), Options0) 208 -> true 209 ; print_message(informational, 210 httpd_started_server(Address, Options0)) 211 ). 212http_server(_Goal, _:Options0) :- 213 existence_error(server_address, Options0). 214 215server_address(Address, Options) :- 216 ( option(port(Port), Options) 217 -> Address = Port 218 ; option(unix_socket(Path), Options) 219 -> Address = unix_socket(Path) 220 ). 221 222address_port(_IFace:Port, Port) :- !. 223address_port(unix_socket(Path), Path) :- !. 224address_port(Address, Address) :- !. 225 226tcp_address(Port) :- 227 var(Port), 228 !. 229tcp_address(Port) :- 230 integer(Port), 231 !. 232tcp_address(_Iface:_Port). 233 234address_domain(localhost:_Port, Domain) => 235 Domain = inet. 236address_domain(Iface:_Port, Domain) => 237 ( catch(ip_name(IP, Iface), error(_,_), fail), 238 functor(IP, ip, 8) 239 -> Domain = inet6 240 ; Domain = inet 241 ). 242address_domain(_, Domain) => 243 Domain = inet.
queue(QueueId)
.
254make_socket(Address, M:Options0, Options) :- 255 tcp_address(Address), 256 make_socket_hook(Address, M:Options0, Options), 257 !. 258make_socket(Address, _:Options0, Options) :- 259 option(tcp_socket(_), Options0), 260 !, 261 make_addr_atom('httpd', Address, Queue), 262 Options = [ queue(Queue) 263 | Options0 264 ]. 265make_socket(Address, _:Options0, Options) :- 266 tcp_address(Address), 267 !, 268 address_domain(Address, Domain), 269 socket_create(Socket, [domain(Domain)]), 270 tcp_setopt(Socket, reuseaddr), 271 tcp_bind(Socket, Address), 272 tcp_listen(Socket, 64), 273 make_addr_atom('httpd', Address, Queue), 274 Options = [ queue(Queue), 275 tcp_socket(Socket) 276 | Options0 277 ]. 278:- if(current_predicate(unix_domain_socket/1)). 279make_socket(Address, _:Options0, Options) :- 280 Address = unix_socket(Path), 281 !, 282 unix_domain_socket(Socket), 283 tcp_bind(Socket, Path), 284 tcp_listen(Socket, 64), 285 make_addr_atom('httpd', Address, Queue), 286 Options = [ queue(Queue), 287 tcp_socket(Socket) 288 | Options0 289 ]. 290:- endif.
297make_addr_atom(Scheme, Address, Atom) :- 298 phrase(address_parts(Address), Parts), 299 atomic_list_concat([Scheme,@|Parts], Atom). 300 301address_parts(Var) --> 302 { var(Var), 303 !, 304 instantiation_error(Var) 305 }. 306address_parts(Atomic) --> 307 { atomic(Atomic) }, 308 !, 309 [Atomic]. 310address_parts(Host:Port) --> 311 !, 312 address_parts(Host), [:], address_parts(Port). 313address_parts(ip(A,B,C,D)) --> 314 !, 315 [ A, '.', B, '.', C, '.', D ]. 316address_parts(unix_socket(Path)) --> 317 [Path]. 318address_parts(Address) --> 319 { domain_error(http_server_address, Address) }.
327create_server(Goal, Address, Options) :- 328 get_time(StartTime), 329 memberchk(queue(Queue), Options), 330 scheme(Scheme, Options), 331 autoload_https(Scheme), 332 address_port(Address, Port), 333 make_addr_atom(Scheme, Port, Alias), 334 thread_self(Initiator), 335 thread_create(accept_server(Goal, Initiator, Options), _, 336 [ alias(Alias) 337 ]), 338 thread_get_message(server_started), 339 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 340 341scheme(Scheme, Options) :- 342 option(scheme(Scheme), Options), 343 !. 344scheme(Scheme, Options) :- 345 ( option(ssl(_), Options) 346 ; option(ssl_instance(_), Options) 347 ), 348 !, 349 Scheme = https. 350scheme(http, _). 351 352autoload_https(https) :- 353 \+ clause(accept_hook(_Goal, _Options), _), 354 exists_source(library(http/http_ssl_plugin)), 355 !, 356 use_module(library(http/http_ssl_plugin)). 357autoload_https(_).
365http_current_server(Goal, Port) :-
366 current_server(Port, Goal, _, _, _, _).
http
or https
.382http_server_property(_:Port, Property) :- 383 integer(Port), 384 !, 385 server_property(Property, Port). 386http_server_property(Port, Property) :- 387 server_property(Property, Port). 388 389server_property(goal(Goal), Port) :- 390 current_server(Port, Goal, _, _, _, _). 391server_property(scheme(Scheme), Port) :- 392 current_server(Port, _, _, _, Scheme, _). 393server_property(start_time(Time), Port) :- 394 current_server(Port, _, _, _, _, Time).
407http_workers(Port, Workers) :- 408 integer(Workers), 409 !, 410 must_be(ground, Port), 411 ( current_server(Port, _, _, Queue, _, _) 412 -> resize_pool(Queue, Workers) 413 ; existence_error(http_server, Port) 414 ). 415http_workers(Port, Workers) :- 416 current_server(Port, _, _, Queue, _, _), 417 aggregate_all(count, queue_worker(Queue, _Worker), Workers).
430http_add_worker(Port, Options) :- 431 must_be(ground, Port), 432 current_server(Port, _, _, Queue, _, _), 433 !, 434 queue_options(Queue, QueueOptions), 435 merge_options(Options, QueueOptions, WorkerOptions), 436 atom_concat(Queue, '_', AliasBase), 437 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 438http_add_worker(Port, _) :- 439 existence_error(http_server, Port).
449http_current_worker(Port, ThreadID) :-
450 current_server(Port, _, _, Queue, _, _),
451 queue_worker(Queue, ThreadID).
459accept_server(Goal, Initiator, Options) :- 460 Ex = http_stop(Stopper), 461 catch(accept_server2(Goal, Initiator, Options), Ex, true), 462 thread_self(Thread), 463 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]), 464 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 465 close_pending_accepts(Queue), 466 close_server_socket(Options), 467 thread_send_message(Stopper, http_stopped). 468 469accept_server2(Goal, Initiator, Options) :- 470 thread_send_message(Initiator, server_started), 471 repeat, 472 ( catch(accept_server3(Goal, Options), E, true) 473 -> ( var(E) 474 -> fail 475 ; accept_rethrow_error(E) 476 -> throw(E) 477 ; print_message(error, E), 478 fail 479 ) 480 ; print_message(error, % internal error 481 goal_failed(accept_server3(Goal, Options))), 482 fail 483 ). 484 485accept_server3(Goal, Options) :- 486 accept_hook(Goal, Options), 487 !. 488accept_server3(Goal, Options) :- 489 memberchk(tcp_socket(Socket), Options), 490 memberchk(queue(Queue), Options), 491 debug(http(connection), 'Waiting for connection', []), 492 tcp_accept(Socket, Client, Peer), 493 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 494 http_enough_workers(Queue, accept, Peer). 495 496send_to_worker(Queue, Client, Goal, Peer) :- 497 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 498 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 499 500accept_rethrow_error(http_stop(_)).
506close_server_socket(Options) :- 507 close_hook(Options), 508 !. 509close_server_socket(Options) :- 510 memberchk(tcp_socket(Socket), Options), 511 !, 512 tcp_close_socket(Socket).
516close_pending_accepts(Queue) :- 517 ( thread_get_message(Queue, Msg, [timeout(0)]) 518 -> close_client(Msg), 519 close_pending_accepts(Queue) 520 ; true 521 ). 522 523close_client(tcp_client(Client, _Goal, _0Peer)) => 524 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 525 tcp_close_socket(Client). 526close_client(Msg) => 527 ( discard_client_hook(Msg) 528 -> true 529 ; print_message(warning, http_close_client(Msg)) 530 ).
540http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 541 ground(Host), 542 !, 543 http_stop_server(Port, Options). 544http_stop_server(Port, _Options) :- 545 http_workers(Port, 0), % checks Port is ground 546 current_server(Port, _, Thread, Queue, _Scheme, _Start), 547 retractall(queue_options(Queue, _)), 548 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 549 thread_self(Stopper), 550 thread_signal(Thread, throw(http_stop(Stopper))), 551 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)]) 552 -> true 553 ; catch(connect(localhost:Port), _, true) 554 ), 555 thread_join(Thread, _0Status), 556 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 557 message_queue_destroy(Queue). 558 559connect(Address) :- 560 setup_call_cleanup( 561 tcp_socket(Socket), 562 tcp_connect(Socket, Address), 563 tcp_close_socket(Socket)).
571http_enough_workers(Queue, _Why, _Peer) :- 572 message_queue_property(Queue, waiting(_0)), 573 !, 574 debug(http(scheduler), '~D waiting for work; ok', [_0]). 575http_enough_workers(Queue, Why, Peer) :- 576 message_queue_property(Queue, size(Size)), 577 ( enough(Size, Why) 578 -> debug(http(scheduler), '~D in queue; ok', [Size]) 579 ; current_server(Port, _, _, Queue, _, _), 580 Data = _{ port:Port, 581 reason:Why, 582 peer:Peer, 583 waiting:Size, 584 queue:Queue 585 }, 586 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 587 catch(http:schedule_workers(Data), 588 Error, 589 print_message(error, Error)) 590 -> true 591 ; true 592 ). 593 594enough(0, _). 595enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
625 /******************************* 626 * WORKER QUEUE OPERATIONS * 627 *******************************/
634create_workers(Options) :- 635 option(workers(N), Options, 5), 636 option(queue(Queue), Options), 637 catch(message_queue_create(Queue), _, true), 638 atom_concat(Queue, '_', AliasBase), 639 create_workers(1, N, Queue, AliasBase, Options), 640 assert(queue_options(Queue, Options)). 641 642create_workers(I, N, _, _, _) :- 643 I > N, 644 !. 645create_workers(I, N, Queue, AliasBase, Options) :- 646 gensym(AliasBase, Alias), 647 thread_create(http_worker(Options), Id, 648 [ alias(Alias) 649 | Options 650 ]), 651 assertz(queue_worker(Queue, Id)), 652 I2 is I + 1, 653 create_workers(I2, N, Queue, AliasBase, Options).
661resize_pool(Queue, Size) :-
662 findall(W, queue_worker(Queue, W), Workers),
663 length(Workers, Now),
664 ( Now < Size
665 -> queue_options(Queue, Options),
666 atom_concat(Queue, '_', AliasBase),
667 I0 is Now+1,
668 create_workers(I0, Size, Queue, AliasBase, Options)
669 ; Now == Size
670 -> true
671 ; Now > Size
672 -> Excess is Now - Size,
673 thread_self(Me),
674 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
675 forall(between(1, Excess, _), thread_get_message(quitted(_)))
676 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
687http_worker(Options) :- 688 debug(http(scheduler), 'New worker', []), 689 prolog_listen(this_thread_exit, done_worker), 690 option(queue(Queue), Options), 691 option(max_idle_time(MaxIdle), Options, infinite), 692 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 693 debug(http(worker), 'Waiting for a job ...', []), 694 debug(http(worker), 'Got job ~p', [Message]), 695 ( Message = quit(Sender) 696 -> !, 697 thread_self(Self), 698 thread_detach(Self), 699 ( Sender == idle 700 -> true 701 ; retract(queue_worker(Queue, Self)), 702 thread_send_message(Sender, quitted(Self)) 703 ) 704 ; open_client(Message, Queue, Goal, In, Out, 705 Options, ClientOptions), 706 ( catch(http_process(Goal, In, Out, ClientOptions), 707 Error, true) 708 -> true 709 ; Error = goal_failed(http_process/4) 710 ), 711 ( var(Error) 712 -> fail 713 ; current_message_level(Error, Level), 714 print_message(Level, Error), 715 memberchk(peer(Peer), ClientOptions), 716 close_connection(Peer, In, Out), 717 fail 718 ) 719 ). 720 721get_work(Queue, Message, infinite) :- 722 !, 723 thread_get_message(Queue, Message). 724get_work(Queue, Message, MaxIdle) :- 725 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 726 -> true 727 ; Message = quit(idle) 728 ).
737open_client(requeue(In, Out, Goal, ClOpts), 738 _, Goal, In, Out, Opts, ClOpts) :- 739 !, 740 memberchk(peer(Peer), ClOpts), 741 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 742 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 743open_client(Message, Queue, Goal, In, Out, Opts, 744 [ pool(client(Queue, Goal, In, Out)), 745 timeout(Timeout) 746 | Options 747 ]) :- 748 catch(open_client(Message, Goal, In, Out, Options, Opts), 749 E, report_error(E)), 750 option(timeout(Timeout), Opts, 60), 751 ( debugging(http(connection)) 752 -> memberchk(peer(Peer), Options), 753 debug(http(connection), 'Opened connection from ~p', [Peer]) 754 ; true 755 ).
761open_client(Message, Goal, In, Out, ClientOptions, Options) :- 762 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 763 !. 764open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 765 [ peer(Peer), 766 protocol(http) 767 ], _) :- 768 tcp_open_socket(Socket, In, Out). 769 770report_error(E) :- 771 print_message(error, E), 772 fail.
781check_keep_alive_connection(In, TMO, Peer, In, Out) :-
782 stream_property(In, timeout(Old)),
783 set_stream(In, timeout(TMO)),
784 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
785 catch(peek_code(In, Code), E, true),
786 ( var(E), % no exception
787 Code \== -1 % no end-of-file
788 -> set_stream(In, timeout(Old)),
789 debug(http(keep_alive), '\tre-using keep-alive connection', [])
790 ; ( Code == -1
791 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
792 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
793 ),
794 close_connection(Peer, In, Out),
795 fail
796 ).
805done_worker :- 806 thread_self(Self), 807 thread_detach(Self), 808 retract(queue_worker(Queue, Self)), 809 thread_property(Self, status(Status)), 810 !, 811 ( catch(recreate_worker(Status, Queue), _, fail) 812 -> print_message(informational, 813 httpd_restarted_worker(Self)) 814 ; done_status_message_level(Status, Level), 815 print_message(Level, 816 httpd_stopped_worker(Self, Status)) 817 ). 818done_worker :- % received quit(Sender) 819 thread_self(Self), 820 thread_property(Self, status(Status)), 821 done_status_message_level(Status, Level), 822 print_message(Level, 823 httpd_stopped_worker(Self, Status)). 824 825done_status_message_level(true, silent) :- !. 826done_status_message_level(exception('$aborted'), silent) :- !. 827done_status_message_level(exception(unwind(abort)), silent) :- !. 828done_status_message_level(exception(unwind(halt(_))), silent) :- !. 829done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
844recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 845 halt(2). 846recreate_worker(exception(Error), Queue) :- 847 recreate_on_error(Error), 848 queue_options(Queue, Options), 849 atom_concat(Queue, '_', AliasBase), 850 create_workers(1, 1, Queue, AliasBase, Options). 851 852recreate_on_error('$aborted'). 853recreate_on_error(unwind(abort)). 854recreate_on_error(time_limit_exceeded).
863:- multifile 864 message_level/2. 865 866message_level(error(io_error(read, _), _), silent). 867message_level(error(socket_error(epipe,_), _), silent). 868message_level(error(http_write_short(_Obj,_Written), _), silent). 869message_level(error(timeout_error(read, _), _), informational). 870message_level(keep_alive_timeout, silent). 871 872current_message_level(Term, Level) :- 873 ( message_level(Term, Level) 874 -> true 875 ; Level = error 876 ).
883read_remaining_request(StartBody, Request) :- 884 memberchk(content_length(Len), Request), 885 !, 886 memberchk(pool(client(_Queue, _Goal, In, _Out)), Request), 887 byte_count(In, Here), 888 Left is StartBody+Len-Here, 889 read_incomplete(In, Left). 890read_remaining_request(_, _Request). 891 892read_incomplete(_, 0) :- 893 !. 894read_incomplete(In, Left) :- 895 % Left < 1 000 000, % Optionally close anyway. 896 catch(setup_call_cleanup( 897 open_null_stream(Null), 898 copy_stream_data(In, Null, Left), 899 close(Null)), 900 error(_,_), 901 fail).
908http_requeue(Header) :- 909 requeue_header(Header, ClientOptions), 910 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 911 memberchk(peer(Peer), ClientOptions), 912 http_enough_workers(Queue, keep_alive, Peer), 913 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 914 !. 915http_requeue(Header) :- 916 debug(http(error), 'Re-queue failed: ~p', [Header]), 917 fail. 918 919requeue_header([], []). 920requeue_header([H|T0], [H|T]) :- 921 requeue_keep(H), 922 !, 923 requeue_header(T0, T). 924requeue_header([_|T0], T) :- 925 requeue_header(T0, T). 926 927requeue_keep(pool(_)). 928requeue_keep(peer(_)). 929requeue_keep(protocol(_)).
936http_process(Goal, In, Out, Options) :- 937 debug(http(server), 'Running server goal ~p on ~p -> ~p', 938 [Goal, In, Out]), 939 option(timeout(TMO), Options, 60), 940 set_stream(In, timeout(TMO)), 941 set_stream(Out, timeout(TMO)), 942 http_wrapper(Goal, In, Out, Connection, 943 [ request(Request), 944 byte_count(StartBody) 945 | Options 946 ]), 947 next(Connection, StartBody, Request). 948 949next(Connection, StartBody, Request) :- 950 next_(Connection, StartBody, Request), !. 951next(Connection, StartBody, Request) :- 952 print_message(warning, goal_failed(next(Connection,StartBody,Request))). 953 954next_(switch_protocol(SwitchGoal, _SwitchOptions), _, Request) :- 955 !, 956 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 957 ( catch(call(SwitchGoal, In, Out), E, 958 ( print_message(error, E), 959 fail)) 960 -> true 961 ; http_close_connection(Request) 962 ). 963next_(spawned(ThreadId), _, _) :- 964 !, 965 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 966next_(Connection, StartBody, Request) :- 967 downcase_atom(Connection, 'keep-alive'), 968 read_remaining_request(StartBody, Request), 969 http_requeue(Request), 970 !. 971next_(_, _, Request) :- 972 http_close_connection(Request).
979http_close_connection(Request) :-
980 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
981 memberchk(peer(Peer), Request),
982 close_connection(Peer, In, Out).
989close_connection(Peer, In, Out) :-
990 debug(http(connection), 'Closing connection from ~p', [Peer]),
991 catch(close(In, [force(true)]), _, true),
992 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.
1010http_spawn(Goal, Options) :- 1011 select_option(pool(Pool), Options, ThreadOptions), 1012 !, 1013 current_output(CGI), 1014 Error = error(Formal, _), 1015 catch(thread_create_in_pool(Pool, 1016 wrap_spawned(CGI, Goal), Id, 1017 [ detached(true) 1018 | ThreadOptions 1019 ]), 1020 Error, 1021 true), 1022 ( var(Formal) 1023 -> http_spawned(Id) 1024 ; Formal = resource_error(threads_in_pool(_)) 1025 -> throw(http_reply(busy)) 1026 ; Formal = existence_error(thread_pool, Pool), 1027 create_pool(Pool) 1028 -> http_spawn(Goal, Options) 1029 ; throw(Error) 1030 ). 1031http_spawn(Goal, Options) :- 1032 current_output(CGI), 1033 thread_create(wrap_spawned(CGI, Goal), Id, 1034 [ detached(true) 1035 | Options 1036 ]), 1037 http_spawned(Id). 1038 1039wrap_spawned(CGI, Goal) :- 1040 set_output(CGI), 1041 cgi_property(CGI, request(Request)), 1042 memberchk(input(Input), Request), 1043 byte_count(Input, StartBody), 1044 http_wrap_spawned(Goal, Request, Connection), 1045 next(Connection, StartBody, Request).
1055create_pool(Pool) :- 1056 E = error(permission_error(create, thread_pool, Pool), _), 1057 catch(http:create_pool(Pool), E, true). 1058create_pool(Pool) :- 1059 print_message(informational, httpd(created_pool(Pool))), 1060 thread_pool_create(Pool, 10, []). 1061 1062 1063 /******************************* 1064 * WAIT POLICIES * 1065 *******************************/ 1066 1067:- meta_predicate 1068 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1075thread_repeat_wait(Goal) :- 1076 new_rate_mma(5, 1000, State), 1077 repeat, 1078 notrace, 1079 update_rate_mma(State, MMA), 1080 long(MMA, IsLong), 1081 ( IsLong == brief 1082 -> call(Goal) 1083 ; thread_idle(Goal, IsLong) 1084 ). 1085 1086long(MMA, brief) :- 1087 MMA < 0.05, 1088 !. 1089long(MMA, short) :- 1090 MMA < 1, 1091 !. 1092long(_, long).
1106new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1107 current_prolog_flag(max_tagged_integer, MaxI), 1108 get_time(Base). 1109 1110update_rate_mma(State, MMAr) :- 1111 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1112 get_time(Now), 1113 Stamp is round((Now-Base)*Resolution), 1114 ( Stamp > MaxI 1115 -> nb_setarg(1, State, Now), 1116 nb_setarg(2, State, 0) 1117 ; true 1118 ), 1119 Diff is Stamp-Last, 1120 nb_setarg(2, State, Stamp), 1121 MMA is round(((N-1)*MMA0+Diff)/N), 1122 nb_setarg(6, State, MMA), 1123 MMAr is MMA/float(Resolution). 1124 1125 1126 /******************************* 1127 * MESSAGES * 1128 *******************************/ 1129 1130:- multifile 1131 prolog:message/3. 1132 1133prologmessage(httpd_started_server(Port, Options)) --> 1134 [ 'Started server at '-[] ], 1135 http_root(Port, Options). 1136prologmessage(httpd_stopped_worker(Self, Status)) --> 1137 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1138prologmessage(httpd_restarted_worker(Self)) --> 1139 [ 'Replaced aborted worker ~p'-[Self] ]. 1140prologmessage(httpd(created_pool(Pool))) --> 1141 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1142 'Create this pool at startup-time or define the hook ', nl, 1143 'http:create_pool/1 to avoid this message and create a ', nl, 1144 'pool that fits the usage-profile.' 1145 ]. 1146 1147http_root(Address, Options) --> 1148 { landing_page(Address, URI, Options) }, 1149 [ url(URI) ]. 1150 1151landing_page(Host:Port, URI, Options) :- 1152 !, 1153 must_be(atom, Host), 1154 must_be(integer, Port), 1155 http_server_property(Port, scheme(Scheme)), 1156 ( default_port(Scheme, Port) 1157 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1158 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1159 ), 1160 entry_page(Base, URI, Options). 1161landing_page(unix_socket(Path), URI, _Options) :- 1162 !, 1163 format(string(URI), 'Unix domain socket "~w"', [Path]). 1164landing_page(Port, URI, Options) :- 1165 landing_page(localhost:Port, URI, Options). 1166 1167default_port(http, 80). 1168default_port(https, 443). 1169 1170entry_page(Base, URI, Options) :- 1171 option(entry_page(Entry), Options), 1172 !, 1173 uri_resolve(Entry, Base, URI). 1174entry_page(Base, URI, _) :- 1175 http_absolute_location(root(.), Entry, []), 1176 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */