18
19:- module(swipe,
20 [ run/1
21 , command/2
22 , command/3
23 , with_pipe_output/3
24 , with_pipe_input/3
25 , with_pipe_io/3
26 , shell_quote//1
27 , op(300,xfy,:>)
28 , op(300,yfx,>:)
29 , op(200,fy,@)
30 ]).
142:- meta_predicate with_pipe_output(-,+,0),
143 with_pipe_input(-,+,0),
144 with_pipe_io(-,+,0). 145:- multifile def/2. 146
147:- use_module(library(dcg_core)). 148:- use_module(library(dcg_codes)). 149:- use_module(library(fileutils)). 150:- use_module(library(settings)). 151
152:- set_prolog_flag(double_quotes,string). 153:- set_prolog_flag(back_quotes,codes). 154
155:- setting(quote_method,oneof([strong,weak]),strong,"Filename quoting method"). 156
157def(cat, sh($T >> $T,"cat")).
158def(cat(F^T), sh(0 >> $T,"cat ~s",[F+read])).
159def(echo(S^T),sh(0 >> $T,"echo ~s",[@S])).
160
161ppipe(P,T) --> "(",pipe(P,T),")".
162pipe(P>>Q, X>>Z) --> !, ppipe(P,X>>Y1), " | ", ppipe(Q,Y2>>Z), {u(P>>Q,Y1,Y2)}.
163pipe(F^X:>P, 0>>Y) --> !, ppipe(P, $X1 >> Y), " < ", file(F,[access(read)]), {u(F^X:>P,X,X1)}.
164pipe(P>:F^Y, X>>0) --> !, ppipe(P, X >> $Y1), " > ", file(F,[access(write)]), {u(P>:F^Y,Y1,Y)}.
165pipe(P*Q, T) --> !, ppipe(P,T1), " && ", ppipe(Q,T2), {seq_types(P*Q,T1,T2,T)}.
166pipe(P+Q,T) --> !, ppipe(P,T1), " & ", ppipe(Q,T2), {par_types(P+Q,T1,T2,T)}.
167pipe(in(D,P),T) --> !, "cd ", abs(D,[file_type(directory)]), " && ", ppipe(P,T).
168pipe(sh(T,Spec),T) --> !, sh(Spec,T).
169pipe(sh(T,F,A),T) --> !, sh(F-A,T).
170pipe(M,T) --> {def(M,P)}, pipe(P,T).
171
172sh(Str,_) --> {atomic(Str)}, !, at(Str).
173sh(Fmt-Args,T) --> !, {maplist(arg_arg(T),Args,QArgs)}, fmt(Fmt,QArgs).
174sh([Cmd|Args],T) --> !, seqmap_with_sep(" ",arg(T),[Cmd|Args]).
175sh(\Phrase,_) --> !, phrase(Phrase).
176
177shell_quote(A) -->
178 {setting(quote_method,QM)},
179 {format(codes(Codes),'~w',[A])},
180 quote(QM,Codes).
181
182file(Spec,Opts) -->
183 ( {compound(Spec); is_absolute_file_name(Spec)}
184 -> abs(Spec,Opts)
185 ; shell_quote(Spec)
186 ).
187
188abs(Spec,Opts) -->
189 { setof(P, absolute_file_name(Spec,P,[solutions(all)|Opts]), Ps)
190 -> ( Ps=[_,_|_] -> throw(indeterminate_file(Spec,Opts,Ps))
191 ; Ps=[Path] -> true
192 )
193 ; throw(no_matching_file(Spec,Opts))
194 },
195 shell_quote(Path).
196
197arg_arg(_,\A,A) :- !.
198arg_arg(T,Spec,String) :-
199 arg(T,Spec,Codes,[]),
200 string_codes(String,Codes).
201
202arg(_, \A) --> phrase(A).
203arg(_, @A) --> shell_quote(A).
204arg(_, Spec+Access) --> file(Spec,[access(Access)]).
205arg(_, file(Spec)) --> file(Spec,[]).
206arg(_, file(Spec,Opts)) --> file(Spec,Opts).
207arg(X >> _, Z<Pipe) --> "<(", pipe(Pipe, Y >> $Z), ")", {lte(proc_subs_in(X,Y,Pipe),Y,X)}.
208arg(_ >> X, Z>Pipe) --> ">(", pipe(Pipe, $Z >> Y), ")", {lte(proc_subs_out(X,Y,Pipe),Y,X)}.
209arg(X >> _, $Pipe) --> "$(", pipe(Pipe, Y >> _), ")", {lte(cmd_subs(X,Y,Pipe),Y,X)}.
210arg(_, A) --> {atomic(A)}, at(A).
211
212
213seq_types(P,In1>>Out1,In2>>Out2,In>>Out) :-
214 meet(input_of(P),In1,In2,In),
215 meet(output_of(P),Out1,Out2,Out).
216
218par_types(P,In1>>Out1,In2>>Out2,In>>Out) :-
219 either(input_of(P),In1,In2,In),
220 meet(output_of(P),Out1,Out2,Out).
221
222u(_,T,T) :- !.
223u(P,T1,T2) :- throw(type_mismatch(P,T1,T2)).
224
225lte(_,T,T) :- !.
226lte(_,0,_) :- !.
227lte(P,T1,T2) :- throw(type_mismatch(P,T1,T2)).
228
229meet(_,T,T,T) :- !.
230meet(_,0,T,T) :- !.
231meet(_,T,0,T) :- !.
232meet(P,T1,T2,_) :- throw(type_mismatch(P,T1,T2)).
233
234either(_,0,T,T) :- !.
235either(_,T,0,T) :- !.
236either(P,T1,T2,_) :- throw(type_mismatch(P,T1,T2)).
244command(Pipeline,Cmd) :- command(Pipeline,_,Cmd).
245command(Pipeline,Type,Cmd) :-
246 pipe(Pipeline,Type1,Codes,[]),
247 (Type=Type1 -> true; throw(swipe_type_mismatch(Pipeline,Type1,Type))),
248 string_codes(Cmd,Codes).
254run(Pipeline) :-
255 ( pipe(Pipeline,T,Cmd,[])
256 -> debug(swipe,"Executing: ~w, ~s",[T,Cmd]),
257 shell(Cmd)
258 ; throw(bad_pipeline(Pipeline))
259 ).
268with_pipe_output(S,Pipe,Goal) :-
269 command(Pipe, 0 >> $_, Cmd),
270 debug(swipe,'reading from pipeline: ~s',[Cmd]),
271 with_stream(S, open(pipe(Cmd),read,S), Goal).
278with_pipe_input(S,Pipe,Goal) :-
279 command(Pipe, $_ >> 0, Cmd),
280 debug(swipe,'writing to pipeline: ~s',[Cmd]),
281 with_stream(S, open(pipe(Cmd),write,S), Goal).
293with_pipe_io(In-Out,Pipe,Goal) :-
294 command(Pipe, $_ >> $_, Cmd),
295 debug(swipe,'reading/writing pipeline: ~s',[Cmd]),
296 setup_call_cleanup(
297 process_create(path(bash),['-c',Cmd],[stdin(pipe(In)), stdout(pipe(Out))]),
298 Goal,
299 ( close(In, [force(true)]),
300 close(Out,[force(true)]))).
301
302quote(strong,Codes) --> "'", esc(strong,Codes), "'".
303quote(weak,Codes) --> "\"", esc(weak,Codes), "\"".
304
315weak([C|T],T) --> [0'\\,C], {member(C,`\\$"\``)}.
316weak([C|T],T) --> [C], {\+member(C,`\\$"\``)}.
326strong([C|T],T) --> [C], ({C=0''} -> "\\''"; [])
Shell pipeline execution utilities
This module provides a mechanism for composing and running Unix shell pipelines. It defines a typed algebraic term language using operators for piping and redirections while checking that the type of data passing through the standard input and output streams of each subprocess match with those of connected processes. The language is only capable of describing simple, linear pipelines, where each process can have one or zero input streams and one or zero output streams. The type of a process is denoted by a term
X>>Y
, where X and Y are stream types and can be 0 for no stream, or $T for a stream of type T, where T is an arbitrary term describing what sort of data is in the stream, eg, plain text or XML. The typing judgements are as follows:The rules for combining types with the * operator (shell &&, sequential execution) and + operator (shell &, concurrent execution) are encoded in the predicates seq_types and par_types. The rules for sequential excution are:
The rules for concurrent execution are
If the type requirements are not met, then the system throws a helpful type_mismatch exception.
The primitive processes are expressed as shell commands. A term
sh(T,Cmd)
, where T is an explicitly given type, corresponds to a shell command Cmd, written, including arguments, as you would type it into the Unix shell. Arguments can be handling using the formsh(T,Fmt,Args)
, where Fmt is a format string as used by format/2, and Args is a list of arguments of type:In process redirection, a command expecting to read to or write from a named file can be redirected to a bash pipeline. In this case, one end of the pipeline is attached to the command, but the other end is left free. The input/output type of that free end interacts with the type of overall command being constructed in the same way as parallel processes interact.
File names
File names should passed to sh/3 as Spec+Access. If Spec is atomic, it is treated as an explicit absolute or relative path in the file system and formatted quoted and escaped so that any special characters in the path are properly handled.
If Spec is a compound term, the system uses absolute_file_name/3 with the
access(Access)
option to expand Spec. This must succeed exactly once, otherwise an exception is thrown. The resulting path is quoted and escaped.In both cases, the result is captured by '~s' in the format string. There is a subtlety in the handling of compound file specifier terms: the file must exist with the correct access at pipeline composition time---if the file is only created when the pipeline is run, then the path expansion will fail. In these cases, you must use an atomic file specifier, or the (@)/1 operator. This also applies to files used with the redirection operators (:>)/2 and (>:)/2.
Declaring new processes
New compound pipelines can be declared using the multifile predicate def/2. The commands cat/0, cat/1 and echo/1 are already defined.
Running
A pipeline expression can be used in one of three ways:
open(pipe(Cmd), ...)
.*/