35
36:- module(http_dyn_workers,
37 [
38 ]). 39:- use_module(library(http/thread_httpd)). 40:- use_module(library(debug)). 41:- use_module(library(settings)). 42:- use_module(library(aggregate)). 43
44:- setting(http:max_workers, integer, 100,
45 "Maximum number of workers to create"). 46:- setting(http:worker_idle_limit, number, 10,
47 "Terminate a dynamic worker when idle for this time"). 48:- setting(http:max_load, number, 10,
49 "Maximum load average caused by HTTP workers").
88:- multifile
89 http:schedule_workers/1. 90
91http:schedule_workers(Dict) :-
92 get_time(Now),
93 catch(thread_send_message('__http_scheduler', no_workers(Now, Dict)),
94 error(existence_error(message_queue, _), _),
95 fail),
96 !.
97http:schedule_workers(Dict) :-
98 create_scheduler,
99 http:schedule_workers(Dict).
100
101create_scheduler :-
102 catch(thread_create(http_scheduler, _,
103 [ alias('__http_scheduler'),
104 inherit_from(main),
105 debug(false),
106 detached(true)
107 ]),
108 error(_,_),
109 fail).
110
111http_scheduler :-
112 get_time(Now),
113 http_scheduler(_{ waiting:0,
114 time:Now
115 }).
116
117http_scheduler(State) :-
118 ( thread_self(Me),
119 thread_get_message(Me, Task, [timeout(10)])
120 -> true
121 ; Task = update_load_avg
122 ),
123 ( catch(reschedule(Task, State, State1),
124 Error,
125 ( print_message(warning, Error),
126 fail))
127 -> !,
128 http_scheduler(State1)
129 ; http_scheduler(State)
130 ).
134reschedule(no_workers(Reported, Dict), State0, State) :-
135 update_load_avg(Dict, State0, State, Load),
136 setting(http:max_load, MaxLoad),
137 ( Load > MaxLoad
138 -> debug(http(scheduler), 'Load ~1f > ~1f; not adding workers',
139 [ Load, MaxLoad ])
140 ; aggregate_all(count, http_current_worker(Dict.port, _), Workers),
141 setting(http:max_workers, MaxWorkers),
142 ( Workers >= MaxWorkers
143 -> debug(http(scheduler),
144 'Reached max workers (~D); not adding workers',
145 [ MaxWorkers ])
146 ; Wait is 0.001*(MaxWorkers/max(1, MaxWorkers-Workers)),
147 get_time(Now),
148 Sleep is max(0.001, Wait + Reported-Now),
149 debug(http(scheduler),
150 'Waiting: ~w; active: ~w; sleep: ~3f; load: ~1f',
151 [Dict.waiting, Workers, Sleep, Load]),
152 sleep(Sleep),
153 accept_queue(Dict, Queue),
154 message_queue_property(Queue, size(Newsize)),
155 ( Newsize == 0
156 -> debug(http(scheduler), 'Drained', [])
157 ; debug(http(scheduler), 'Size is ~w: adding worker', [Newsize]),
158 setting(http:worker_idle_limit, MaxIdle),
159 http_add_worker(Dict.port,
160 [ max_idle_time(MaxIdle)
161 ])
162 )
163 )
164 ).
165reschedule(update_load_avg, State0, State) :-
166 update_load_avg(_{}, State0, State, _).
167
168update_load_avg(_Dict, State, State, Load) :-
169 _{stamp:Last, load:Load} :< State.get(load),
170 get_time(Now),
171 Now - Last < 10.
172update_load_avg(Dict, State0, State, Load) :-
173 server_port(Dict, State0, State1, Port),
174 !,
175 aggregate_all(sum(CPU), worker_cpu(Port, CPU), CPU1),
176 get_time(Now),
177 ( LoadDict = State1.get(load),
178 _{stamp:Last, cpu:LastCPU} :< LoadDict
179 -> Load0 is (CPU1-LastCPU)/(Now-Last),
180 smooth_load(LoadDict, Load0, Load),
181 State = State1.put(load, _{stamp:Now, cpu:CPU1, load:Load})
182 ; State = State1.put(load, _{stamp:Now, cpu:CPU1}),
183 Load = 0
184 ).
185update_load_avg(_, _, _, 0).
186
187worker_cpu(Port, CPU) :-
188 http_current_worker(Port, Thread),
189 catch(thread_statistics(Thread, cputime, CPU), _, fail).
190
191server_port(_Dict, State, State, Port) :-
192 Port = State.get(port),
193 !.
194server_port(Dict, State0, State, Port) :-
195 Port = Dict.get(port),
196 State = State0.put(port, Port).
197
198smooth_load(LoadDict, Load0, Load) :-
199 OldLoad = LoadDict.get(load),
200 !,
201 Load is (5*OldLoad+Load0)/6.
202smooth_load(_, Load, Load).
209accept_queue(Dict, Queue) :-
210 Queue = Dict.get(queue),
211 !.
212accept_queue(Dict, Queue) :-
213 thread_httpd:current_server(Dict.port, _, _, Queue, _, _),
214 !
Dynamically schedule HTTP workers.
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This module defines hooks into the HTTP framework to dynamically schedule worker threads. Dynamic scheduling relieves us from finding a good value for the size of the HTTP worker pool.
The decision to add a worker follows these rules:
The policy depends on three settings: