36:- module(http_dyn_workers,
37 [
38 ]). 39:- use_module(library(http/thread_httpd)). 40:- use_module(library(debug)). 41:- use_module(library(settings)). 42:- use_module(library(aggregate)). 43
44:- setting(http:max_workers, integer, 100,
45 "Maximum number of workers to create"). 46:- setting(http:worker_idle_limit, number, 10,
47 "Terminate a dynamic worker when idle for this time"). 48:- setting(http:max_load, number, 10,
49 "Maximum load average caused by HTTP workers").
88:- multifile
89 http:schedule_workers/1. 90
91http:schedule_workers(Dict) :-
92 get_time(Now),
93 catch(thread_send_message('__http_scheduler', no_workers(Now, Dict)),
94 error(existence_error(message_queue, _), _),
95 fail),
96 !.
97http:schedule_workers(Dict) :-
98 create_scheduler,
99 http:schedule_workers(Dict).
101create_scheduler :-
102 catch(thread_create(http_scheduler, _,
103 [ alias('__http_scheduler'),
104 inherit_from(main),
105 debug(false),
106 detached(true)
107 ]),
108 error(_,_),
109 fail).
111http_scheduler :-
112 get_time(Now),
113 http_scheduler(_{ waiting:0,
114 time:Now
115 }).
117http_scheduler(State) :-
118 ( thread_self(Me),
119 thread_get_message(Me, Task, [timeout(10)])
120 -> true
121 ; Task = update_load_avg
122 ),
123 ( catch(reschedule(Task, State, State1),
124 Error,
125 ( print_message(warning, Error),
126 fail))
127 -> !,
128 http_scheduler(State1)
129 ; http_scheduler(State)
130 ).
134reschedule(no_workers(Reported, Dict), State0, State) :-
135 update_load_avg(Dict, State0, State, Load),
136 setting(http:max_load, MaxLoad),
137 ( Load > MaxLoad
138 -> debug(http(scheduler), 'Load ~1f > ~1f; not adding workers',
139 [ Load, MaxLoad ])
140 ; aggregate_all(count, http_current_worker(Dict.port, _), Workers),
141 setting(http:max_workers, MaxWorkers),
142 ( Workers >= MaxWorkers
143 -> debug(http(scheduler),
144 'Reached max workers (~D); not adding workers',
145 [ MaxWorkers ])
146 ; Wait is 0.001*(MaxWorkers/max(1, MaxWorkers-Workers)),
147 get_time(Now),
148 Sleep is max(0.001, Wait + Reported-Now),
149 debug(http(scheduler),
150 'Waiting: ~w; active: ~w; sleep: ~3f; load: ~1f',
151 [Dict.waiting, Workers, Sleep, Load]),
152 sleep(Sleep),
153 accept_queue(Dict, Queue),
154 message_queue_property(Queue, size(Newsize)),
155 ( Newsize == 0
156 -> debug(http(scheduler), 'Drained', [])
157 ; debug(http(scheduler), 'Size is ~w: adding worker', [Newsize]),
158 setting(http:worker_idle_limit, MaxIdle),
159 http_add_worker(Dict.port,
160 [ max_idle_time(MaxIdle)
161 ])
162 )
163 )
164 ).
165reschedule(update_load_avg, State0, State) :-
166 update_load_avg(_{}, State0, State, _).
168update_load_avg(_Dict, State, State, Load) :-
169 _{stamp:Last, load:Load} :< State.get(load),
170 get_time(Now),
171 Now - Last < 10.
172update_load_avg(Dict, State0, State, Load) :-
173 server_port(Dict, State0, State1, Port),
174 !,
175 aggregate_all(sum(CPU), worker_cpu(Port, CPU), CPU1),
176 get_time(Now),
177 ( LoadDict = State1.get(load),
178 _{stamp:Last, cpu:LastCPU} :< LoadDict
179 -> Load0 is (CPU1-LastCPU)/(Now-Last),
180 smooth_load(LoadDict, Load0, Load),
181 State = State1.put(load, _{stamp:Now, cpu:CPU1, load:Load})
182 ; State = State1.put(load, _{stamp:Now, cpu:CPU1}),
183 Load = 0
184 ).
185update_load_avg(_, _, _, 0).
187worker_cpu(Port, CPU) :-
188 http_current_worker(Port, Thread),
189 catch(thread_statistics(Thread, cputime, CPU), _, fail).
191server_port(_Dict, State, State, Port) :-
192 Port = State.get(port),
193 !.
194server_port(Dict, State0, State, Port) :-
195 Port = Dict.get(port),
196 State = State0.put(port, Port).
198smooth_load(LoadDict, Load0, Load) :-
199 OldLoad = LoadDict.get(load),
200 !,
201 Load is (5*OldLoad+Load0)/6.
202smooth_load(_, Load, Load).
209accept_queue(Dict, Queue) :-
210 Queue = Dict.get(queue),
211 !.
212accept_queue(Dict, Queue) :-
213 thread_httpd:current_server(Dict.port, _, _, Queue, _, _),
214 !
Dynamically schedule HTTP workers.
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This module defines hooks into the HTTP framework to dynamically schedule worker threads. Dynamic scheduling relieves us from finding a good value for the size of the HTTP worker pool.
The decision to add a worker follows these rules:
The policy depends on three settings: