View source with formatted comments or as raw
    1/*  Part of SWI-Prolog
    2
    3    Author:        Jan Wielemaker
    4    E-mail:        J.Wielemaker@vu.nl
    5    WWW:           http://www.swi-prolog.org
    6    Copyright (c)  2002-2024, University of Amsterdam
    7                              VU University Amsterdam
    8                              CWI, Amsterdam
    9                              SWI-Prolog Solutions b.v.
   10    All rights reserved.
   11
   12    Redistribution and use in source and binary forms, with or without
   13    modification, are permitted provided that the following conditions
   14    are met:
   15
   16    1. Redistributions of source code must retain the above copyright
   17       notice, this list of conditions and the following disclaimer.
   18
   19    2. Redistributions in binary form must reproduce the above copyright
   20       notice, this list of conditions and the following disclaimer in
   21       the documentation and/or other materials provided with the
   22       distribution.
   23
   24    THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
   25    "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
   26    LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
   27    FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
   28    COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
   29    INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
   30    BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
   31    LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
   32    CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
   33    LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
   34    ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
   35    POSSIBILITY OF SUCH DAMAGE.
   36*/
   37
   38:- module(thread_httpd,
   39          [ http_current_server/2,      % ?:Goal, ?Port
   40            http_server_property/2,     % ?Port, ?Property
   41            http_server/2,              % :Goal, +Options
   42            http_workers/2,             % +Port, ?WorkerCount
   43            http_add_worker/2,          % +Port, +Options
   44            http_current_worker/2,      % ?Port, ?ThreadID
   45            http_stop_server/2,         % +Port, +Options
   46            http_spawn/2,               % :Goal, +Options
   47
   48            http_requeue/1,             % +Request
   49            http_close_connection/1,    % +Request
   50            http_enough_workers/3       % +Queue, +Why, +Peer
   51          ]).   52:- use_module(library(debug)).   53:- use_module(library(error)).   54:- use_module(library(option)).   55:- use_module(library(socket)).   56:- use_module(library(thread_pool)).   57:- use_module(library(gensym)).   58:- use_module(http_wrapper).   59:- use_module(http_path).   60:- use_module(http_stream).   61
   62:- autoload(library(uri), [uri_resolve/3]).   63:- autoload(library(aggregate), [aggregate_all/3]).   64
   65:- predicate_options(http_server/2, 2,
   66                     [ port(any),
   67                       unix_socket(atom),
   68                       entry_page(atom),
   69                       tcp_socket(any),
   70                       workers(positive_integer),
   71                       timeout(number),
   72                       keep_alive_timeout(number),
   73                       silent(boolean),
   74                       ssl(list(any)),  % if http/http_ssl_plugin is loaded
   75                       pass_to(system:thread_create/3, 3)
   76                     ]).   77:- predicate_options(http_spawn/2, 2,
   78                     [ pool(atom),
   79                       pass_to(system:thread_create/3, 3),
   80                       pass_to(thread_pool:thread_create_in_pool/4, 4)
   81                     ]).   82:- predicate_options(http_add_worker/2, 2,
   83                     [ timeout(number),
   84                       keep_alive_timeout(number),
   85                       max_idle_time(number),
   86                       pass_to(system:thread_create/3, 3)
   87                     ]).   88
   89/** <module> Threaded HTTP server
   90
   91Most   code   doesn't   need  to   use  this   directly;  instead   use
   92library(http/http_server),  which  combines   this  library  with   the
   93typical HTTP libraries that most servers need.
   94
   95This library defines the HTTP server  frontend of choice for SWI-Prolog.
   96It is based on the multi-threading   capabilities of SWI-Prolog and thus
   97exploits multiple cores  to  serve   requests  concurrently.  The server
   98scales well and can cooperate with   library(thread_pool) to control the
   99number of concurrent requests of a given   type.  For example, it can be
  100configured to handle 200 file download requests concurrently, 2 requests
  101that potentially uses a lot of memory and   8 requests that use a lot of
  102CPU resources.
  103
  104On   Unix   systems,    this    library     can    be    combined   with
  105library(http/http_unix_daemon) to realise a proper  Unix service process
  106that creates a web server at  port   80,  runs under a specific account,
  107optionally detaches from the controlling terminal, etc.
  108
  109Combined with library(http/http_ssl_plugin) from the   SSL package, this
  110library   can   be   used   to    create     an    HTTPS   server.   See
  111<plbase>/doc/packages/examples/ssl/https for an example   server using a
  112self-signed SSL certificate.
  113*/
  114
  115:- meta_predicate
  116    http_server(1, :),
  117    http_current_server(1, ?),
  118    http_spawn(0, +).  119
  120:- dynamic
  121    current_server/6,       % Port, Goal, Thread, Queue, Scheme, StartTime
  122    queue_worker/2,         % Queue, ThreadID
  123    queue_options/2.        % Queue, Options
  124
  125:- multifile
  126    make_socket_hook/3,
  127    accept_hook/2,
  128    close_hook/1,
  129    open_client_hook/6,
  130    discard_client_hook/1,
  131    http:create_pool/1,
  132    http:schedule_workers/1.  133
  134:- meta_predicate
  135    thread_repeat_wait(0).  136
  137%!  http_server(:Goal, :Options) is det.
  138%
  139%   Create a server at Port that calls Goal for each parsed request.
  140%   Options provide a list of options. Defined options are
  141%
  142%     * port(?Address)
  143%     Port to bind to.  Address is either a port or a term
  144%     Host:Port. The port may be a variable, causing the system
  145%     to select a free port.  See tcp_bind/2.
  146%
  147%     * unix_socket(+Path)
  148%     Instead of binding to a TCP port, bind to a _Unix Domain
  149%     Socket_ at Path.
  150%
  151%     * entry_page(+URI)
  152%     Affects the message printed while the server is started.
  153%     Interpreted as a URI relative to the server root.
  154%
  155%     * tcp_socket(+Socket)
  156%     If provided, use this socket instead of the creating one and
  157%     binding it to an address.  The socket must be bound to an
  158%     address.  Note that this also allows binding an HTTP server to
  159%     a Unix domain socket (``AF_UNIX``).  See socket_create/2.
  160%
  161%     * workers(+Count)
  162%     Determine the number of worker threads.  Default is 5.  This
  163%     is fine for small scale usage.  Public servers typically need
  164%     a higher number.
  165%
  166%     * timeout(+Seconds)
  167%     Maximum time of inactivity trying to read the request after a
  168%     connection has been opened.  Default is 60 seconds.  See
  169%     set_stream/1 using the _timeout_ option.
  170%
  171%     * keep_alive_timeout(+Seconds)
  172%     Time to keep `Keep alive' connections alive.  Default is
  173%     2 seconds.
  174%
  175%     * stack_limit(+Bytes)
  176%     Stack limit to use for the workers.  The default is inherited
  177%     from the `main` thread.
  178%     If you need to control resource usage you may consider the
  179%     `spawn` option of http_handler/3 and library(thread_pool).
  180%
  181%     * silent(Bool)
  182%     If `true` (default `false`), do not print an informational
  183%     message that the server was started.
  184%
  185%   A  typical  initialization  for  an    HTTP   server  that  uses
  186%   http_dispatch/1 to relay requests to predicates is:
  187%
  188%     ==
  189%     :- use_module(library(http/thread_httpd)).
  190%     :- use_module(library(http/http_dispatch)).
  191%
  192%     start_server(Port) :-
  193%         http_server(http_dispatch, [port(Port)]).
  194%     ==
  195%
  196%   Note that multiple servers  can  coexist   in  the  same  Prolog
  197%   process. A notable application of this is   to have both an HTTP
  198%   and HTTPS server, where the HTTP   server redirects to the HTTPS
  199%   server for handling sensitive requests.
  200
  201http_server(Goal, M:Options0) :-
  202    server_address(Address, Options0),
  203    !,
  204    make_socket(Address, M:Options0, Options),
  205    create_workers(Options),
  206    create_server(Goal, Address, Options),
  207    (   option(silent(true), Options0)
  208    ->  true
  209    ;   print_message(informational,
  210                      httpd_started_server(Address, Options0))
  211    ).
  212http_server(_Goal, _:Options0) :-
  213    existence_error(server_address, Options0).
  214
  215server_address(Address, Options) :-
  216    (   option(port(Port), Options)
  217    ->  Address = Port
  218    ;   option(unix_socket(Path), Options)
  219    ->  Address = unix_socket(Path)
  220    ).
  221
  222address_port(_IFace:Port, Port) :- !.
  223address_port(unix_socket(Path), Path) :- !.
  224address_port(Address, Address) :- !.
  225
  226tcp_address(Port) :-
  227    var(Port),
  228    !.
  229tcp_address(Port) :-
  230    integer(Port),
  231    !.
  232tcp_address(_Iface:_Port).
  233
  234address_domain(localhost:_Port, Domain) =>
  235    Domain = inet.
  236address_domain(Iface:_Port, Domain) =>
  237    (   catch(ip_name(IP, Iface), error(_,_), fail),
  238        functor(IP, ip, 8)
  239    ->  Domain = inet6
  240    ;   Domain = inet
  241    ).
  242address_domain(_, Domain) =>
  243    Domain = inet.
  244
  245
  246%!  make_socket(+Address, :OptionsIn, -OptionsOut) is det.
  247%
  248%   Create the HTTP server socket and  worker pool queue. OptionsOut
  249%   is guaranteed to hold the option queue(QueueId).
  250%
  251%   @arg   OptionsIn   is   qualified   to     allow   passing   the
  252%   module-sensitive ssl option argument.
  253
  254make_socket(Address, M:Options0, Options) :-
  255    tcp_address(Address),
  256    make_socket_hook(Address, M:Options0, Options),
  257    !.
  258make_socket(Address, _:Options0, Options) :-
  259    option(tcp_socket(_), Options0),
  260    !,
  261    make_addr_atom('httpd', Address, Queue),
  262    Options = [ queue(Queue)
  263              | Options0
  264              ].
  265make_socket(Address, _:Options0, Options) :-
  266    tcp_address(Address),
  267    !,
  268    address_domain(Address, Domain),
  269    socket_create(Socket, [domain(Domain)]),
  270    tcp_setopt(Socket, reuseaddr),
  271    tcp_bind(Socket, Address),
  272    tcp_listen(Socket, 64),
  273    make_addr_atom('httpd', Address, Queue),
  274    Options = [ queue(Queue),
  275                tcp_socket(Socket)
  276              | Options0
  277              ].
  278:- if(current_predicate(unix_domain_socket/1)).  279make_socket(Address, _:Options0, Options) :-
  280    Address = unix_socket(Path),
  281    !,
  282    unix_domain_socket(Socket),
  283    tcp_bind(Socket, Path),
  284    tcp_listen(Socket, 64),
  285    make_addr_atom('httpd', Address, Queue),
  286    Options = [ queue(Queue),
  287                tcp_socket(Socket)
  288              | Options0
  289              ].
  290:- endif.  291
  292%!  make_addr_atom(+Scheme, +Address, -Atom) is det.
  293%
  294%   Create an atom that identifies  the   server's  queue and thread
  295%   resources.
  296
  297make_addr_atom(Scheme, Address, Atom) :-
  298    phrase(address_parts(Address), Parts),
  299    atomic_list_concat([Scheme,@|Parts], Atom).
  300
  301address_parts(Var) -->
  302    { var(Var),
  303      !,
  304      instantiation_error(Var)
  305    }.
  306address_parts(Atomic) -->
  307    { atomic(Atomic) },
  308    !,
  309    [Atomic].
  310address_parts(Host:Port) -->
  311    !,
  312    address_parts(Host), [:], address_parts(Port).
  313address_parts(ip(A,B,C,D)) -->
  314    !,
  315    [ A, '.', B, '.', C, '.', D ].
  316address_parts(unix_socket(Path)) -->
  317    [Path].
  318address_parts(Address) -->
  319    { domain_error(http_server_address, Address) }.
  320
  321
  322%!  create_server(:Goal, +Address, +Options) is det.
  323%
  324%   Create the main server thread that runs accept_server/2 to
  325%   listen to new requests.
  326
  327create_server(Goal, Address, Options) :-
  328    get_time(StartTime),
  329    memberchk(queue(Queue), Options),
  330    scheme(Scheme, Options),
  331    autoload_https(Scheme),
  332    address_port(Address, Port),
  333    make_addr_atom(Scheme, Port, Alias),
  334    thread_self(Initiator),
  335    thread_create(accept_server(Goal, Initiator, Options), _,
  336                  [ alias(Alias)
  337                  ]),
  338    thread_get_message(server_started),
  339    assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)).
  340
  341scheme(Scheme, Options) :-
  342    option(scheme(Scheme), Options),
  343    !.
  344scheme(Scheme, Options) :-
  345    (   option(ssl(_), Options)
  346    ;   option(ssl_instance(_), Options)
  347    ),
  348    !,
  349    Scheme = https.
  350scheme(http, _).
  351
  352autoload_https(https) :-
  353    \+ clause(accept_hook(_Goal, _Options), _),
  354    exists_source(library(http/http_ssl_plugin)),
  355    !,
  356    use_module(library(http/http_ssl_plugin)).
  357autoload_https(_).
  358
  359%!  http_current_server(:Goal, ?Port) is nondet.
  360%
  361%   True if Goal is the goal of a server at Port.
  362%
  363%   @deprecated Use http_server_property(Port, goal(Goal))
  364
  365http_current_server(Goal, Port) :-
  366    current_server(Port, Goal, _, _, _, _).
  367
  368
  369%!  http_server_property(?Port, ?Property) is nondet.
  370%
  371%   True if Property is a property of the HTTP server running at
  372%   Port.  Defined properties are:
  373%
  374%       * goal(:Goal)
  375%       Goal used to start the server. This is often
  376%       http_dispatch/1.
  377%       * scheme(-Scheme)
  378%       Scheme is one of `http` or `https`.
  379%       * start_time(?Time)
  380%       Time-stamp when the server was created.
  381
  382http_server_property(_:Port, Property) :-
  383    integer(Port),
  384    !,
  385    server_property(Property, Port).
  386http_server_property(Port, Property) :-
  387    server_property(Property, Port).
  388
  389server_property(goal(Goal), Port) :-
  390    current_server(Port, Goal, _, _, _, _).
  391server_property(scheme(Scheme), Port) :-
  392    current_server(Port, _, _, _, Scheme, _).
  393server_property(start_time(Time), Port) :-
  394    current_server(Port, _, _, _, _, Time).
  395
  396
  397%!  http_workers(?Port, -Workers) is nondet.
  398%!  http_workers(+Port, +Workers:int) is det.
  399%
  400%   Query or set the number of workers for  the server at this port. The
  401%   number of workers is dynamically modified. Setting it to 1 (one) can
  402%   be used to profile the worker using tprofile/1.
  403%
  404%   @see library(http/http_dyn_workers) implements dynamic management of
  405%   the worker pool depending on usage.
  406
  407http_workers(Port, Workers) :-
  408    integer(Workers),
  409    !,
  410    must_be(ground, Port),
  411    (   current_server(Port, _, _, Queue, _, _)
  412    ->  resize_pool(Queue, Workers)
  413    ;   existence_error(http_server, Port)
  414    ).
  415http_workers(Port, Workers) :-
  416    current_server(Port, _, _, Queue, _, _),
  417    aggregate_all(count, queue_worker(Queue, _Worker), Workers).
  418
  419
  420%!  http_add_worker(+Port, +Options) is det.
  421%
  422%   Add a new worker to  the  HTTP   server  for  port Port. Options
  423%   overrule the default queue  options.   The  following additional
  424%   options are processed:
  425%
  426%     - max_idle_time(+Seconds)
  427%     The created worker will automatically terminate if there is
  428%     no new work within Seconds.
  429
  430http_add_worker(Port, Options) :-
  431    must_be(ground, Port),
  432    current_server(Port, _, _, Queue, _, _),
  433    !,
  434    queue_options(Queue, QueueOptions),
  435    merge_options(Options, QueueOptions, WorkerOptions),
  436    atom_concat(Queue, '_', AliasBase),
  437    create_workers(1, 1, Queue, AliasBase, WorkerOptions).
  438http_add_worker(Port, _) :-
  439    existence_error(http_server, Port).
  440
  441
  442%!  http_current_worker(?Port, ?ThreadID) is nondet.
  443%
  444%   True if ThreadID is the identifier   of  a Prolog thread serving
  445%   Port. This predicate is  motivated  to   allow  for  the  use of
  446%   arbitrary interaction with the worker thread for development and
  447%   statistics.
  448
  449http_current_worker(Port, ThreadID) :-
  450    current_server(Port, _, _, Queue, _, _),
  451    queue_worker(Queue, ThreadID).
  452
  453
  454%!  accept_server(:Goal, +Initiator, +Options)
  455%
  456%   The goal of a small server-thread accepting new requests and
  457%   posting them to the queue of workers.
  458
  459accept_server(Goal, Initiator, Options) :-
  460    Ex = http_stop(Stopper),
  461    catch(accept_server2(Goal, Initiator, Options), Ex, true),
  462    thread_self(Thread),
  463    debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]),
  464    retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)),
  465    close_pending_accepts(Queue),
  466    close_server_socket(Options),
  467    thread_send_message(Stopper, http_stopped).
  468
  469accept_server2(Goal, Initiator, Options) :-
  470    thread_send_message(Initiator, server_started),
  471    repeat,
  472      (   catch(accept_server3(Goal, Options), E, true)
  473      ->  (   var(E)
  474          ->  fail
  475          ;   accept_rethrow_error(E)
  476          ->  throw(E)
  477          ;   print_message(error, E),
  478              fail
  479          )
  480      ;   print_message(error,      % internal error
  481                        goal_failed(accept_server3(Goal, Options))),
  482          fail
  483      ).
  484
  485accept_server3(Goal, Options) :-
  486    accept_hook(Goal, Options),
  487    !.
  488accept_server3(Goal, Options) :-
  489    memberchk(tcp_socket(Socket), Options),
  490    memberchk(queue(Queue), Options),
  491    debug(http(connection), 'Waiting for connection', []),
  492    tcp_accept(Socket, Client, Peer),
  493    sig_atomic(send_to_worker(Queue, Client, Goal, Peer)),
  494    http_enough_workers(Queue, accept, Peer).
  495
  496send_to_worker(Queue, Client, Goal, Peer) :-
  497    debug(http(connection), 'New HTTP connection from ~p', [Peer]),
  498    thread_send_message(Queue, tcp_client(Client, Goal, Peer)).
  499
  500accept_rethrow_error(http_stop(_)).
  501
  502%!  close_server_socket(+Options)
  503%
  504%   Close the server socket.
  505
  506close_server_socket(Options) :-
  507    close_hook(Options),
  508    !.
  509close_server_socket(Options) :-
  510    memberchk(tcp_socket(Socket), Options),
  511    !,
  512    tcp_close_socket(Socket).
  513
  514%!  close_pending_accepts(+Queue)
  515
  516close_pending_accepts(Queue) :-
  517    (   thread_get_message(Queue, Msg, [timeout(0)])
  518    ->  close_client(Msg),
  519        close_pending_accepts(Queue)
  520    ;   true
  521    ).
  522
  523close_client(tcp_client(Client, _Goal, _0Peer)) =>
  524    debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]),
  525    tcp_close_socket(Client).
  526close_client(Msg) =>
  527    (   discard_client_hook(Msg)
  528    ->  true
  529    ;   print_message(warning, http_close_client(Msg))
  530    ).
  531
  532
  533%!  http_stop_server(+Port, +Options)
  534%
  535%   Stop the indicated  HTTP  server   gracefully.  First  stops all
  536%   workers, then stops the server.
  537%
  538%   @tbd    Realise non-graceful stop
  539
  540http_stop_server(Host:Port, Options) :-         % e.g., localhost:4000
  541    ground(Host),
  542    !,
  543    http_stop_server(Port, Options).
  544http_stop_server(Port, _Options) :-
  545    http_workers(Port, 0),                  % checks Port is ground
  546    current_server(Port, _, Thread, Queue, _Scheme, _Start),
  547    retractall(queue_options(Queue, _)),
  548    debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]),
  549    thread_self(Stopper),
  550    thread_signal(Thread, throw(http_stop(Stopper))),
  551    (   thread_get_message(Stopper, http_stopped, [timeout(0.1)])
  552    ->  true
  553    ;   catch(connect(localhost:Port), _, true)
  554    ),
  555    thread_join(Thread, _0Status),
  556    debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]),
  557    message_queue_destroy(Queue).
  558
  559connect(Address) :-
  560    setup_call_cleanup(
  561        tcp_socket(Socket),
  562        tcp_connect(Socket, Address),
  563        tcp_close_socket(Socket)).
  564
  565%!  http_enough_workers(+Queue, +Why, +Peer) is det.
  566%
  567%   Check that we have enough workers in our queue. If not, call the
  568%   hook http:schedule_workers/1 to extend  the   worker  pool. This
  569%   predicate can be used by accept_hook/2.
  570
  571http_enough_workers(Queue, _Why, _Peer) :-
  572    message_queue_property(Queue, waiting(_0)),
  573    !,
  574    debug(http(scheduler), '~D waiting for work; ok', [_0]).
  575http_enough_workers(Queue, Why, Peer) :-
  576    message_queue_property(Queue, size(Size)),
  577    (   enough(Size, Why)
  578    ->  debug(http(scheduler), '~D in queue; ok', [Size])
  579    ;   current_server(Port, _, _, Queue, _, _),
  580        Data = _{ port:Port,
  581                  reason:Why,
  582                  peer:Peer,
  583                  waiting:Size,
  584                  queue:Queue
  585                },
  586        debug(http(scheduler), 'Asking to reschedule: ~p', [Data]),
  587        catch(http:schedule_workers(Data),
  588              Error,
  589              print_message(error, Error))
  590    ->  true
  591    ;   true
  592    ).
  593
  594enough(0, _).
  595enough(1, keep_alive).                  % I will be ready myself
  596
  597
  598%!  http:schedule_workers(+Data:dict) is semidet.
  599%
  600%   Hook called if a  new  connection   or  a  keep-alive connection
  601%   cannot be scheduled _immediately_ to a worker. Dict contains the
  602%   following keys:
  603%
  604%     - port:Port
  605%     Port number that identifies the server.
  606%     - reason:Reason
  607%     One of =accept= for a new connection or =keep_alive= if a
  608%     worker tries to reschedule itself.
  609%     - peer:Peer
  610%     Identify the other end of the connection
  611%     - waiting:Size
  612%     Number of messages waiting in the queue.
  613%     - queue:Queue
  614%     Message queue used to dispatch accepted messages.
  615%
  616%   Note that, when called with `reason:accept`,   we  are called in
  617%   the time critical main accept loop.   An  implementation of this
  618%   hook shall typically send  the  event   to  thread  dedicated to
  619%   dynamic worker-pool management.
  620%
  621%   @see    http_add_worker/2 may be used to create (temporary) extra
  622%           workers.
  623
  624
  625                 /*******************************
  626                 *    WORKER QUEUE OPERATIONS   *
  627                 *******************************/
  628
  629%!  create_workers(+Options)
  630%
  631%   Create the pool of HTTP worker-threads. Each worker has the
  632%   alias http_worker_N.
  633
  634create_workers(Options) :-
  635    option(workers(N), Options, 5),
  636    option(queue(Queue), Options),
  637    catch(message_queue_create(Queue), _, true),
  638    atom_concat(Queue, '_', AliasBase),
  639    create_workers(1, N, Queue, AliasBase, Options),
  640    assert(queue_options(Queue, Options)).
  641
  642create_workers(I, N, _, _, _) :-
  643    I > N,
  644    !.
  645create_workers(I, N, Queue, AliasBase, Options) :-
  646    gensym(AliasBase, Alias),
  647    thread_create(http_worker(Options), Id,
  648                  [ alias(Alias)
  649                  | Options
  650                  ]),
  651    assertz(queue_worker(Queue, Id)),
  652    I2 is I + 1,
  653    create_workers(I2, N, Queue, AliasBase, Options).
  654
  655
  656%!  resize_pool(+Queue, +Workers) is det.
  657%
  658%   Create or destroy workers. If workers   are  destroyed, the call
  659%   waits until the desired number of waiters is reached.
  660
  661resize_pool(Queue, Size) :-
  662    findall(W, queue_worker(Queue, W), Workers),
  663    length(Workers, Now),
  664    (   Now < Size
  665    ->  queue_options(Queue, Options),
  666        atom_concat(Queue, '_', AliasBase),
  667        I0 is Now+1,
  668        create_workers(I0, Size, Queue, AliasBase, Options)
  669    ;   Now == Size
  670    ->  true
  671    ;   Now > Size
  672    ->  Excess is Now - Size,
  673        thread_self(Me),
  674        forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
  675        forall(between(1, Excess, _), thread_get_message(quitted(_)))
  676    ).
  677
  678
  679%!  http_worker(+Options)
  680%
  681%   Run HTTP worker main loop. Workers   simply  wait until they are
  682%   passed an accepted socket to process  a client.
  683%
  684%   If the message quit(Sender) is read   from the queue, the worker
  685%   stops.
  686
  687http_worker(Options) :-
  688    debug(http(scheduler), 'New worker', []),
  689    prolog_listen(this_thread_exit, done_worker),
  690    option(queue(Queue), Options),
  691    option(max_idle_time(MaxIdle), Options, infinite),
  692    thread_repeat_wait(get_work(Queue, Message, MaxIdle)),
  693      debug(http(worker), 'Waiting for a job ...', []),
  694      debug(http(worker), 'Got job ~p', [Message]),
  695      (   Message = quit(Sender)
  696      ->  !,
  697          thread_self(Self),
  698          thread_detach(Self),
  699          (   Sender == idle
  700          ->  true
  701          ;   retract(queue_worker(Queue, Self)),
  702              thread_send_message(Sender, quitted(Self))
  703          )
  704      ;   open_client(Message, Queue, Goal, In, Out,
  705                      Options, ClientOptions),
  706          (   catch(http_process(Goal, In, Out, ClientOptions),
  707                    Error, true)
  708          ->  true
  709          ;   Error = goal_failed(http_process/4)
  710          ),
  711          (   var(Error)
  712          ->  fail
  713          ;   current_message_level(Error, Level),
  714              print_message(Level, Error),
  715              memberchk(peer(Peer), ClientOptions),
  716              close_connection(Peer, In, Out),
  717              fail
  718          )
  719      ).
  720
  721get_work(Queue, Message, infinite) :-
  722    !,
  723    thread_get_message(Queue, Message).
  724get_work(Queue, Message, MaxIdle) :-
  725    (   thread_get_message(Queue, Message, [timeout(MaxIdle)])
  726    ->  true
  727    ;   Message = quit(idle)
  728    ).
  729
  730
  731%!  open_client(+Message, +Queue, -Goal, -In, -Out,
  732%!              +Options, -ClientOptions) is semidet.
  733%
  734%   Opens the connection to the client in a worker from the message
  735%   sent to the queue by accept_server/2.
  736
  737open_client(requeue(In, Out, Goal, ClOpts),
  738            _, Goal, In, Out, Opts, ClOpts) :-
  739    !,
  740    memberchk(peer(Peer), ClOpts),
  741    option(keep_alive_timeout(KeepAliveTMO), Opts, 2),
  742    check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out).
  743open_client(Message, Queue, Goal, In, Out, Opts,
  744            [ pool(client(Queue, Goal, In, Out)),
  745              timeout(Timeout)
  746            | Options
  747            ]) :-
  748    catch(open_client(Message, Goal, In, Out, Options, Opts),
  749          E, report_error(E)),
  750    option(timeout(Timeout), Opts, 60),
  751    (   debugging(http(connection))
  752    ->  memberchk(peer(Peer), Options),
  753        debug(http(connection), 'Opened connection from ~p', [Peer])
  754    ;   true
  755    ).
  756
  757
  758%!  open_client(+Message, +Goal, -In, -Out,
  759%!              -ClientOptions, +Options) is det.
  760
  761open_client(Message, Goal, In, Out, ClientOptions, Options) :-
  762    open_client_hook(Message, Goal, In, Out, ClientOptions, Options),
  763    !.
  764open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out,
  765            [ peer(Peer),
  766              protocol(http)
  767            ], _) :-
  768    tcp_open_socket(Socket, In, Out).
  769
  770report_error(E) :-
  771    print_message(error, E),
  772    fail.
  773
  774
  775%!  check_keep_alive_connection(+In, +TimeOut, +Peer, +In, +Out) is semidet.
  776%
  777%   Wait for the client for at most  TimeOut seconds. Succeed if the
  778%   client starts a new request within   this  time. Otherwise close
  779%   the connection and fail.
  780
  781check_keep_alive_connection(In, TMO, Peer, In, Out) :-
  782    stream_property(In, timeout(Old)),
  783    set_stream(In, timeout(TMO)),
  784    debug(http(keep_alive), 'Waiting for keep-alive ...', []),
  785    catch(peek_code(In, Code), E, true),
  786    (   var(E),                     % no exception
  787        Code \== -1                 % no end-of-file
  788    ->  set_stream(In, timeout(Old)),
  789        debug(http(keep_alive), '\tre-using keep-alive connection', [])
  790    ;   (   Code == -1
  791        ->  debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
  792        ;   debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
  793        ),
  794        close_connection(Peer, In, Out),
  795        fail
  796    ).
  797
  798
  799%!  done_worker
  800%
  801%   Called when worker is terminated  due   to  http_workers/2  or a
  802%   (debugging) exception. In  the   latter  case, recreate_worker/2
  803%   creates a new worker.
  804
  805done_worker :-
  806    thread_self(Self),
  807    thread_detach(Self),
  808    retract(queue_worker(Queue, Self)),
  809    thread_property(Self, status(Status)),
  810    !,
  811    (   catch(recreate_worker(Status, Queue), _, fail)
  812    ->  print_message(informational,
  813                      httpd_restarted_worker(Self))
  814    ;   done_status_message_level(Status, Level),
  815        print_message(Level,
  816                      httpd_stopped_worker(Self, Status))
  817    ).
  818done_worker :-                                  % received quit(Sender)
  819    thread_self(Self),
  820    thread_property(Self, status(Status)),
  821    done_status_message_level(Status, Level),
  822    print_message(Level,
  823                  httpd_stopped_worker(Self, Status)).
  824
  825done_status_message_level(true, silent) :- !.
  826done_status_message_level(exception('$aborted'), silent) :- !.
  827done_status_message_level(exception(unwind(abort)), silent) :- !.
  828done_status_message_level(exception(unwind(halt(_))), silent) :- !.
  829done_status_message_level(_, informational).
  830
  831
  832%!  recreate_worker(+Status, +Queue) is semidet.
  833%
  834%   Deal with the possibility  that   threads  are,  during development,
  835%   killed with abort/0. We recreate the worker to avoid that eventually
  836%   we run out of workers. If  we  are   aborted  due  to a halt/0 call,
  837%   thread_create/3 will raise a permission error.
  838%
  839%   The first clause deals with the possibility  that we cannot write to
  840%   `user_error`. This is possible when Prolog   is started as a service
  841%   using some service managers. Would be  nice   if  we  could write an
  842%   error, but where?
  843
  844recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :-
  845    halt(2).
  846recreate_worker(exception(Error), Queue) :-
  847    recreate_on_error(Error),
  848    queue_options(Queue, Options),
  849    atom_concat(Queue, '_', AliasBase),
  850    create_workers(1, 1, Queue, AliasBase, Options).
  851
  852recreate_on_error('$aborted').
  853recreate_on_error(unwind(abort)).
  854recreate_on_error(time_limit_exceeded).
  855
  856%!  thread_httpd:message_level(+Exception, -Level)
  857%
  858%   Determine the message stream used  for   exceptions  that  may occur
  859%   during server_loop/5. Being multifile, clauses can   be added by the
  860%   application to refine error handling.   See  also message_hook/3 for
  861%   further programming error handling.
  862
  863:- multifile
  864    message_level/2.  865
  866message_level(error(io_error(read, _), _),               silent).
  867message_level(error(socket_error(epipe,_), _),           silent).
  868message_level(error(http_write_short(_Obj,_Written), _), silent).
  869message_level(error(timeout_error(read, _), _),          informational).
  870message_level(keep_alive_timeout,                        silent).
  871
  872current_message_level(Term, Level) :-
  873    (   message_level(Term, Level)
  874    ->  true
  875    ;   Level = error
  876    ).
  877
  878%!  read_remaining_request(+StartBody, +Request) is semidet.
  879%
  880%   If our handler did not read the   complete  request we must read the
  881%   remainder if we are dealing with a Keep-alive connection.
  882
  883read_remaining_request(StartBody, Request) :-
  884    memberchk(content_length(Len), Request),
  885    !,
  886    memberchk(pool(client(_Queue, _Goal, In, _Out)), Request),
  887    byte_count(In, Here),
  888    Left is StartBody+Len-Here,
  889    read_incomplete(In, Left).
  890read_remaining_request(_, _Request).
  891
  892read_incomplete(_, 0) :-
  893    !.
  894read_incomplete(In, Left) :-
  895    % Left < 1 000 000,			% Optionally close anyway.
  896    catch(setup_call_cleanup(
  897              open_null_stream(Null),
  898              copy_stream_data(In, Null, Left),
  899              close(Null)),
  900          error(_,_),
  901          fail).
  902
  903%!  http_requeue(+Header)
  904%
  905%   Re-queue a connection to  the  worker   pool.  This  deals  with
  906%   processing additional requests on keep-alive connections.
  907
  908http_requeue(Header) :-
  909    requeue_header(Header, ClientOptions),
  910    memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions),
  911    memberchk(peer(Peer), ClientOptions),
  912    http_enough_workers(Queue, keep_alive, Peer),
  913    thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)),
  914    !.
  915http_requeue(Header) :-
  916    debug(http(error), 'Re-queue failed: ~p', [Header]),
  917    fail.
  918
  919requeue_header([], []).
  920requeue_header([H|T0], [H|T]) :-
  921    requeue_keep(H),
  922    !,
  923    requeue_header(T0, T).
  924requeue_header([_|T0], T) :-
  925    requeue_header(T0, T).
  926
  927requeue_keep(pool(_)).
  928requeue_keep(peer(_)).
  929requeue_keep(protocol(_)).
  930
  931
  932%!  http_process(Message, Queue, +Options)
  933%
  934%   Handle a single client message on the given stream.
  935
  936http_process(Goal, In, Out, Options) :-
  937    debug(http(server), 'Running server goal ~p on ~p -> ~p',
  938          [Goal, In, Out]),
  939    option(timeout(TMO), Options, 60),
  940    set_stream(In, timeout(TMO)),
  941    set_stream(Out, timeout(TMO)),
  942    http_wrapper(Goal, In, Out, Connection,
  943                 [ request(Request),
  944                   byte_count(StartBody)
  945                 | Options
  946                 ]),
  947    next(Connection, StartBody, Request).
  948
  949next(Connection, StartBody, Request) :-
  950    next_(Connection, StartBody, Request), !.
  951next(Connection, StartBody, Request) :-
  952    print_message(warning, goal_failed(next(Connection,StartBody,Request))).
  953
  954next_(switch_protocol(SwitchGoal, _SwitchOptions), _, Request) :-
  955    !,
  956    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  957    (   catch(call(SwitchGoal, In, Out), E,
  958              (   print_message(error, E),
  959                  fail))
  960    ->  true
  961    ;   http_close_connection(Request)
  962    ).
  963next_(spawned(ThreadId), _, _) :-
  964    !,
  965    debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]).
  966next_(Connection, StartBody, Request) :-
  967    downcase_atom(Connection, 'keep-alive'),
  968    read_remaining_request(StartBody, Request),
  969    http_requeue(Request),
  970    !.
  971next_(_, _, Request) :-
  972    http_close_connection(Request).
  973
  974
  975%!  http_close_connection(+Request)
  976%
  977%   Close connection associated to Request.  See also http_requeue/1.
  978
  979http_close_connection(Request) :-
  980    memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
  981    memberchk(peer(Peer), Request),
  982    close_connection(Peer, In, Out).
  983
  984%!  close_connection(+Peer, +In, +Out)
  985%
  986%   Closes the connection from the server to the client.  Errors are
  987%   currently silently ignored.
  988
  989close_connection(Peer, In, Out) :-
  990    debug(http(connection), 'Closing connection from ~p', [Peer]),
  991    catch(close(In, [force(true)]), _, true),
  992    catch(close(Out, [force(true)]), _, true).
  993
  994%!  http_spawn(:Goal, +Options) is det.
  995%
  996%   Continue this connection on a  new   thread.  A handler may call
  997%   http_spawn/2 to start a new thread that continues processing the
  998%   current request using Goal. The original   thread returns to the
  999%   worker pool for processing new requests.   Options are passed to
 1000%   thread_create/3, except for:
 1001%
 1002%       * pool(+Pool)
 1003%       Interfaces to library(thread_pool), starting the thread
 1004%       on the given pool.
 1005%
 1006%   If a pool does not exist, this predicate calls the multifile
 1007%   hook http:create_pool/1 to create it. If this predicate succeeds
 1008%   the operation is retried.
 1009
 1010http_spawn(Goal, Options) :-
 1011    select_option(pool(Pool), Options, ThreadOptions),
 1012    !,
 1013    current_output(CGI),
 1014    Error = error(Formal, _),
 1015    catch(thread_create_in_pool(Pool,
 1016                                wrap_spawned(CGI, Goal), Id,
 1017                                [ detached(true)
 1018                                | ThreadOptions
 1019                                ]),
 1020          Error,
 1021          true),
 1022    (   var(Formal)
 1023    ->  http_spawned(Id)
 1024    ;   Formal = resource_error(threads_in_pool(_))
 1025    ->  throw(http_reply(busy))
 1026    ;   Formal = existence_error(thread_pool, Pool),
 1027        create_pool(Pool)
 1028    ->  http_spawn(Goal, Options)
 1029    ;   throw(Error)
 1030    ).
 1031http_spawn(Goal, Options) :-
 1032    current_output(CGI),
 1033    thread_create(wrap_spawned(CGI, Goal), Id,
 1034                  [ detached(true)
 1035                  | Options
 1036                  ]),
 1037    http_spawned(Id).
 1038
 1039wrap_spawned(CGI, Goal) :-
 1040    set_output(CGI),
 1041    cgi_property(CGI, request(Request)),
 1042    memberchk(input(Input), Request),
 1043    byte_count(Input, StartBody),
 1044    http_wrap_spawned(Goal, Request, Connection),
 1045    next(Connection, StartBody, Request).
 1046
 1047%!  create_pool(+Pool)
 1048%
 1049%   Lazy  creation  of  worker-pools  for   the  HTTP  server.  This
 1050%   predicate calls the hook http:create_pool/1.   If the hook fails
 1051%   it creates a default pool of size   10. This should suffice most
 1052%   typical usecases. Note that we  get   a  permission error if the
 1053%   pool is already created.  We can ignore this.
 1054
 1055create_pool(Pool) :-
 1056    E = error(permission_error(create, thread_pool, Pool), _),
 1057    catch(http:create_pool(Pool), E, true).
 1058create_pool(Pool) :-
 1059    print_message(informational, httpd(created_pool(Pool))),
 1060    thread_pool_create(Pool, 10, []).
 1061
 1062
 1063		 /*******************************
 1064		 *         WAIT POLICIES	*
 1065		 *******************************/
 1066
 1067:- meta_predicate
 1068    thread_repeat_wait(0). 1069
 1070%!  thread_repeat_wait(:Goal) is multi.
 1071%
 1072%   Acts as `repeat,  thread_idle(Goal)`,  choosing   whether  to  use a
 1073%   `long` or `short` idle time based on the average firing rate.
 1074
 1075thread_repeat_wait(Goal) :-
 1076    new_rate_mma(5, 1000, State),
 1077    repeat,
 1078      notrace,
 1079      update_rate_mma(State, MMA),
 1080      long(MMA, IsLong),
 1081      (   IsLong == brief
 1082      ->  call(Goal)
 1083      ;   thread_idle(Goal, IsLong)
 1084      ).
 1085
 1086long(MMA, brief) :-
 1087    MMA < 0.05,
 1088    !.
 1089long(MMA, short) :-
 1090    MMA < 1,
 1091    !.
 1092long(_, long).
 1093
 1094%!  new_rate_mma(+N, +Resolution, -State) is det.
 1095%!  update_rate_mma(!State, -MMA) is det.
 1096%
 1097%   Implement _Modified Moving  Average_  computing   the  average  time
 1098%   between requests as an exponential moving average with alpha=1/N.
 1099%
 1100%   @arg Resolution is the time resolution  in 1/Resolution seconds. All
 1101%   storage is done in integers to avoid  the need for stack freezing in
 1102%   nb_setarg/3.
 1103%
 1104%   @see https://en.wikipedia.org/wiki/Moving_average
 1105
 1106new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :-
 1107    current_prolog_flag(max_tagged_integer, MaxI),
 1108    get_time(Base).
 1109
 1110update_rate_mma(State, MMAr) :-
 1111    State = rstate(Base, Last, MaxI, Resolution, N, MMA0),
 1112    get_time(Now),
 1113    Stamp is round((Now-Base)*Resolution),
 1114    (   Stamp > MaxI
 1115    ->  nb_setarg(1, State, Now),
 1116        nb_setarg(2, State, 0)
 1117    ;   true
 1118    ),
 1119    Diff is Stamp-Last,
 1120    nb_setarg(2, State, Stamp),
 1121    MMA is round(((N-1)*MMA0+Diff)/N),
 1122    nb_setarg(6, State, MMA),
 1123    MMAr is MMA/float(Resolution).
 1124
 1125
 1126                 /*******************************
 1127                 *            MESSAGES          *
 1128                 *******************************/
 1129
 1130:- multifile
 1131    prolog:message/3. 1132
 1133prolog:message(httpd_started_server(Port, Options)) -->
 1134    [ 'Started server at '-[] ],
 1135    http_root(Port, Options).
 1136prolog:message(httpd_stopped_worker(Self, Status)) -->
 1137    [ 'Stopped worker ~p: ~p'-[Self, Status] ].
 1138prolog:message(httpd_restarted_worker(Self)) -->
 1139    [ 'Replaced aborted worker ~p'-[Self] ].
 1140prolog:message(httpd(created_pool(Pool))) -->
 1141    [ 'Created thread-pool ~p of size 10'-[Pool], nl,
 1142      'Create this pool at startup-time or define the hook ', nl,
 1143      'http:create_pool/1 to avoid this message and create a ', nl,
 1144      'pool that fits the usage-profile.'
 1145    ].
 1146
 1147http_root(Address, Options) -->
 1148    { landing_page(Address, URI, Options) },
 1149    [ url(URI) ].
 1150
 1151landing_page(Host:Port, URI, Options) :-
 1152    !,
 1153    must_be(atom, Host),
 1154    must_be(integer, Port),
 1155    http_server_property(Port, scheme(Scheme)),
 1156    (   default_port(Scheme, Port)
 1157    ->  format(atom(Base), '~w://~w', [Scheme, Host])
 1158    ;   format(atom(Base), '~w://~w:~w', [Scheme, Host, Port])
 1159    ),
 1160    entry_page(Base, URI, Options).
 1161landing_page(unix_socket(Path), URI, _Options) :-
 1162    !,
 1163    format(string(URI), 'Unix domain socket "~w"', [Path]).
 1164landing_page(Port, URI, Options) :-
 1165    landing_page(localhost:Port, URI, Options).
 1166
 1167default_port(http, 80).
 1168default_port(https, 443).
 1169
 1170entry_page(Base, URI, Options) :-
 1171    option(entry_page(Entry), Options),
 1172    !,
 1173    uri_resolve(Entry, Base, URI).
 1174entry_page(Base, URI, _) :-
 1175    http_absolute_location(root(.), Entry, []),
 1176    uri_resolve(Entry, Base, URI)