1/* Part of SWI-Prolog 2 3 Author: Jan Wielemaker 4 E-mail: J.Wielemaker@vu.nl 5 WWW: http://www.swi-prolog.org 6 Copyright (c) 2002-2022, University of Amsterdam 7 VU University Amsterdam 8 CWI, Amsterdam 9 All rights reserved. 10 11 Redistribution and use in source and binary forms, with or without 12 modification, are permitted provided that the following conditions 13 are met: 14 15 1. Redistributions of source code must retain the above copyright 16 notice, this list of conditions and the following disclaimer. 17 18 2. Redistributions in binary form must reproduce the above copyright 19 notice, this list of conditions and the following disclaimer in 20 the documentation and/or other materials provided with the 21 distribution. 22 23 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 24 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 25 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 26 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 27 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 28 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 29 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 30 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 31 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 32 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 33 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 34 POSSIBILITY OF SUCH DAMAGE. 35*/ 36 37:- module(thread_httpd, 38 [ http_current_server/2, % ?:Goal, ?Port 39 http_server_property/2, % ?Port, ?Property 40 http_server/2, % :Goal, +Options 41 http_workers/2, % +Port, ?WorkerCount 42 http_add_worker/2, % +Port, +Options 43 http_current_worker/2, % ?Port, ?ThreadID 44 http_stop_server/2, % +Port, +Options 45 http_spawn/2, % :Goal, +Options 46 47 http_requeue/1, % +Request 48 http_close_connection/1, % +Request 49 http_enough_workers/3 % +Queue, +Why, +Peer 50 ]). 51:- use_module(library(debug)). 52:- use_module(library(error)). 53:- use_module(library(option)). 54:- use_module(library(socket)). 55:- use_module(library(thread_pool)). 56:- use_module(library(gensym)). 57:- use_module(http_wrapper). 58:- use_module(http_path). 59 60:- autoload(library(uri), [uri_resolve/3]). 61 62:- predicate_options(http_server/2, 2, 63 [ port(any), 64 unix_socket(atom), 65 entry_page(atom), 66 tcp_socket(any), 67 workers(positive_integer), 68 timeout(number), 69 keep_alive_timeout(number), 70 silent(boolean), 71 ssl(list(any)), % if http/http_ssl_plugin is loaded 72 pass_to(system:thread_create/3, 3) 73 ]). 74:- predicate_options(http_spawn/2, 2, 75 [ pool(atom), 76 pass_to(system:thread_create/3, 3), 77 pass_to(thread_pool:thread_create_in_pool/4, 4) 78 ]). 79:- predicate_options(http_add_worker/2, 2, 80 [ timeout(number), 81 keep_alive_timeout(number), 82 max_idle_time(number), 83 pass_to(system:thread_create/3, 3) 84 ]).
112:- meta_predicate 113 http_server( , ), 114 http_current_server( , ), 115 http_spawn( , ). 116 117:- dynamic 118 current_server/6, % Port, Goal, Thread, Queue, Scheme, StartTime 119 queue_worker/2, % Queue, ThreadID 120 queue_options/2. % Queue, Options 121 122:- multifile 123 make_socket_hook/3, 124 accept_hook/2, 125 close_hook/1, 126 open_client_hook/6, 127 discard_client_hook/1, 128 http:create_pool/1, 129 http:schedule_workers/1. 130 131:- meta_predicate 132 thread_repeat_wait( ).
main
thread.
If you need to control resource usage you may consider the
spawn
option of http_handler/3 and library(thread_pool).true
(default false
), do not print an informational
message that the server was started.A typical initialization for an HTTP server that uses http_dispatch/1 to relay requests to predicates is:
:- use_module(library(http/thread_httpd)). :- use_module(library(http/http_dispatch)). start_server(Port) :- http_server(http_dispatch, [port(Port)]).
Note that multiple servers can coexist in the same Prolog process. A notable application of this is to have both an HTTP and HTTPS server, where the HTTP server redirects to the HTTPS server for handling sensitive requests.
197http_server(Goal, M:Options0) :- 198 server_address(Address, Options0), 199 !, 200 make_socket(Address, M:Options0, Options), 201 create_workers(Options), 202 create_server(Goal, Address, Options), 203 ( option(silent(true), Options0) 204 -> true 205 ; print_message(informational, 206 httpd_started_server(Address, Options0)) 207 ). 208http_server(_Goal, _:Options0) :- 209 existence_error(server_address, Options0). 210 211server_address(Address, Options) :- 212 ( option(port(Port), Options) 213 -> Address = Port 214 ; option(unix_socket(Path), Options) 215 -> Address = unix_socket(Path) 216 ). 217 218address_port(_IFace:Port, Port) :- !. 219address_port(unix_socket(Path), Path) :- !. 220address_port(Address, Address) :- !. 221 222tcp_address(Port) :- 223 var(Port), 224 !. 225tcp_address(Port) :- 226 integer(Port), 227 !. 228tcp_address(_Iface:_Port). 229 230address_domain(localhost:_Port, Domain) => 231 Domain = inet. 232address_domain(Iface:_Port, Domain) => 233 ( catch(ip_name(IP, Iface), error(_,_), fail), 234 functor(IP, ip, 8) 235 -> Domain = inet6 236 ; Domain = inet 237 ). 238address_domain(_, Domain) => 239 Domain = inet.
queue(QueueId)
.
250make_socket(Address, M:Options0, Options) :- 251 tcp_address(Address), 252 make_socket_hook(Address, M:Options0, Options), 253 !. 254make_socket(Address, _:Options0, Options) :- 255 option(tcp_socket(_), Options0), 256 !, 257 make_addr_atom('httpd', Address, Queue), 258 Options = [ queue(Queue) 259 | Options0 260 ]. 261make_socket(Address, _:Options0, Options) :- 262 tcp_address(Address), 263 !, 264 address_domain(Address, Domain), 265 socket_create(Socket, [domain(Domain)]), 266 tcp_setopt(Socket, reuseaddr), 267 tcp_bind(Socket, Address), 268 tcp_listen(Socket, 64), 269 make_addr_atom('httpd', Address, Queue), 270 Options = [ queue(Queue), 271 tcp_socket(Socket) 272 | Options0 273 ]. 274:- if(current_predicate(unix_domain_socket/1)). 275make_socket(Address, _:Options0, Options) :- 276 Address = unix_socket(Path), 277 !, 278 unix_domain_socket(Socket), 279 tcp_bind(Socket, Path), 280 tcp_listen(Socket, 64), 281 make_addr_atom('httpd', Address, Queue), 282 Options = [ queue(Queue), 283 tcp_socket(Socket) 284 | Options0 285 ]. 286:- endif.
293make_addr_atom(Scheme, Address, Atom) :- 294 phrase(address_parts(Address), Parts), 295 atomic_list_concat([Scheme,@|Parts], Atom). 296 297address_parts(Var) --> 298 { var(Var), 299 !, 300 instantiation_error(Var) 301 }. 302address_parts(Atomic) --> 303 { atomic(Atomic) }, 304 !, 305 [Atomic]. 306address_parts(Host:Port) --> 307 !, 308 address_parts(Host), [:], address_parts(Port). 309address_parts(ip(A,B,C,D)) --> 310 !, 311 [ A, '.', B, '.', C, '.', D ]. 312address_parts(unix_socket(Path)) --> 313 [Path]. 314address_parts(Address) --> 315 { domain_error(http_server_address, Address) }.
323create_server(Goal, Address, Options) :- 324 get_time(StartTime), 325 memberchk(queue(Queue), Options), 326 scheme(Scheme, Options), 327 autoload_https(Scheme), 328 address_port(Address, Port), 329 make_addr_atom(Scheme, Port, Alias), 330 thread_self(Initiator), 331 thread_create(accept_server(Goal, Initiator, Options), _, 332 [ alias(Alias) 333 ]), 334 thread_get_message(server_started), 335 assert(current_server(Port, Goal, Alias, Queue, Scheme, StartTime)). 336 337scheme(Scheme, Options) :- 338 option(scheme(Scheme), Options), 339 !. 340scheme(Scheme, Options) :- 341 ( option(ssl(_), Options) 342 ; option(ssl_instance(_), Options) 343 ), 344 !, 345 Scheme = https. 346scheme(http, _). 347 348autoload_https(https) :- 349 \+ clause(accept_hook(_Goal, _Options), _), 350 exists_source(library(http/http_ssl_plugin)), 351 !, 352 use_module(library(http/http_ssl_plugin)). 353autoload_https(_).
361http_current_server(Goal, Port) :-
362 current_server(Port, Goal, _, _, _, _).
http
or https
.378http_server_property(_:Port, Property) :- 379 integer(Port), 380 !, 381 server_property(Property, Port). 382http_server_property(Port, Property) :- 383 server_property(Property, Port). 384 385server_property(goal(Goal), Port) :- 386 current_server(Port, Goal, _, _, _, _). 387server_property(scheme(Scheme), Port) :- 388 current_server(Port, _, _, _, Scheme, _). 389server_property(start_time(Time), Port) :- 390 current_server(Port, _, _, _, _, Time).
400http_workers(Port, Workers) :- 401 must_be(ground, Port), 402 current_server(Port, _, _, Queue, _, _), 403 !, 404 ( integer(Workers) 405 -> resize_pool(Queue, Workers) 406 ; findall(W, queue_worker(Queue, W), WorkerIDs), 407 length(WorkerIDs, Workers) 408 ). 409http_workers(Port, _) :- 410 existence_error(http_server, Port).
423http_add_worker(Port, Options) :- 424 must_be(ground, Port), 425 current_server(Port, _, _, Queue, _, _), 426 !, 427 queue_options(Queue, QueueOptions), 428 merge_options(Options, QueueOptions, WorkerOptions), 429 atom_concat(Queue, '_', AliasBase), 430 create_workers(1, 1, Queue, AliasBase, WorkerOptions). 431http_add_worker(Port, _) :- 432 existence_error(http_server, Port).
442http_current_worker(Port, ThreadID) :-
443 current_server(Port, _, _, Queue, _, _),
444 queue_worker(Queue, ThreadID).
452accept_server(Goal, Initiator, Options) :- 453 Ex = http_stop(Stopper), 454 catch(accept_server2(Goal, Initiator, Options), Ex, true), 455 thread_self(Thread), 456 debug(http(stop), '[~p]: accept server received ~p', [Thread, Ex]), 457 retract(current_server(_Port, _, Thread, Queue, _Scheme, _StartTime)), 458 close_pending_accepts(Queue), 459 close_server_socket(Options), 460 thread_send_message(Stopper, http_stopped). 461 462accept_server2(Goal, Initiator, Options) :- 463 thread_send_message(Initiator, server_started), 464 repeat, 465 ( catch(accept_server3(Goal, Options), E, true) 466 -> ( var(E) 467 -> fail 468 ; accept_rethrow_error(E) 469 -> throw(E) 470 ; print_message(error, E), 471 fail 472 ) 473 ; print_message(error, % internal error 474 goal_failed(accept_server3(Goal, Options))), 475 fail 476 ). 477 478accept_server3(Goal, Options) :- 479 accept_hook(Goal, Options), 480 !. 481accept_server3(Goal, Options) :- 482 memberchk(tcp_socket(Socket), Options), 483 memberchk(queue(Queue), Options), 484 debug(http(connection), 'Waiting for connection', []), 485 tcp_accept(Socket, Client, Peer), 486 sig_atomic(send_to_worker(Queue, Client, Goal, Peer)), 487 http_enough_workers(Queue, accept, Peer). 488 489send_to_worker(Queue, Client, Goal, Peer) :- 490 debug(http(connection), 'New HTTP connection from ~p', [Peer]), 491 thread_send_message(Queue, tcp_client(Client, Goal, Peer)). 492 493accept_rethrow_error(http_stop(_)). 494accept_rethrow_error('$aborted').
501close_server_socket(Options) :- 502 close_hook(Options), 503 !. 504close_server_socket(Options) :- 505 memberchk(tcp_socket(Socket), Options), 506 !, 507 tcp_close_socket(Socket).
511close_pending_accepts(Queue) :- 512 ( thread_get_message(Queue, Msg, [timeout(0)]) 513 -> close_client(Msg), 514 close_pending_accepts(Queue) 515 ; true 516 ). 517 518close_client(tcp_client(Client, _Goal, _0Peer)) => 519 debug(http(stop), 'Closing connection from ~p during shut-down', [_0Peer]), 520 tcp_close_socket(Client). 521close_client(Msg) => 522 ( discard_client_hook(Msg) 523 -> true 524 ; print_message(warning, http_close_client(Msg)) 525 ).
535http_stop_server(Host:Port, Options) :- % e.g., localhost:4000 536 ground(Host), 537 !, 538 http_stop_server(Port, Options). 539http_stop_server(Port, _Options) :- 540 http_workers(Port, 0), % checks Port is ground 541 current_server(Port, _, Thread, Queue, _Scheme, _Start), 542 retractall(queue_options(Queue, _)), 543 debug(http(stop), 'Signalling HTTP server thread ~p to stop', [Thread]), 544 thread_self(Stopper), 545 thread_signal(Thread, throw(http_stop(Stopper))), 546 ( thread_get_message(Stopper, http_stopped, [timeout(0.1)]) 547 -> true 548 ; catch(connect(localhost:Port), _, true) 549 ), 550 thread_join(Thread, _0Status), 551 debug(http(stop), 'Joined HTTP server thread ~p (~p)', [Thread, _0Status]), 552 message_queue_destroy(Queue). 553 554connect(Address) :- 555 setup_call_cleanup( 556 tcp_socket(Socket), 557 tcp_connect(Socket, Address), 558 tcp_close_socket(Socket)).
566http_enough_workers(Queue, _Why, _Peer) :- 567 message_queue_property(Queue, waiting(_0)), 568 !, 569 debug(http(scheduler), '~D waiting for work; ok', [_0]). 570http_enough_workers(Queue, Why, Peer) :- 571 message_queue_property(Queue, size(Size)), 572 ( enough(Size, Why) 573 -> debug(http(scheduler), '~D in queue; ok', [Size]) 574 ; current_server(Port, _, _, Queue, _, _), 575 Data = _{ port:Port, 576 reason:Why, 577 peer:Peer, 578 waiting:Size, 579 queue:Queue 580 }, 581 debug(http(scheduler), 'Asking to reschedule: ~p', [Data]), 582 catch(http:schedule_workers(Data), 583 Error, 584 print_message(error, Error)) 585 -> true 586 ; true 587 ). 588 589enough(0, _). 590enough(1, keep_alive). % I will be ready myself
accept
for a new connection or keep_alive
if a
worker tries to reschedule itself.
Note that, when called with reason:accept
, we are called in
the time critical main accept loop. An implementation of this
hook shall typically send the event to thread dedicated to
dynamic worker-pool management.
620 /******************************* 621 * WORKER QUEUE OPERATIONS * 622 *******************************/
629create_workers(Options) :- 630 option(workers(N), Options, 5), 631 option(queue(Queue), Options), 632 catch(message_queue_create(Queue), _, true), 633 atom_concat(Queue, '_', AliasBase), 634 create_workers(1, N, Queue, AliasBase, Options), 635 assert(queue_options(Queue, Options)). 636 637create_workers(I, N, _, _, _) :- 638 I > N, 639 !. 640create_workers(I, N, Queue, AliasBase, Options) :- 641 gensym(AliasBase, Alias), 642 thread_create(http_worker(Options), Id, 643 [ alias(Alias) 644 | Options 645 ]), 646 assertz(queue_worker(Queue, Id)), 647 I2 is I + 1, 648 create_workers(I2, N, Queue, AliasBase, Options).
656resize_pool(Queue, Size) :-
657 findall(W, queue_worker(Queue, W), Workers),
658 length(Workers, Now),
659 ( Now < Size
660 -> queue_options(Queue, Options),
661 atom_concat(Queue, '_', AliasBase),
662 I0 is Now+1,
663 create_workers(I0, Size, Queue, AliasBase, Options)
664 ; Now == Size
665 -> true
666 ; Now > Size
667 -> Excess is Now - Size,
668 thread_self(Me),
669 forall(between(1, Excess, _), thread_send_message(Queue, quit(Me))),
670 forall(between(1, Excess, _), thread_get_message(quitted(_)))
671 ).
If the message quit(Sender)
is read from the queue, the worker
stops.
682http_worker(Options) :- 683 debug(http(scheduler), 'New worker', []), 684 prolog_listen(this_thread_exit, done_worker), 685 option(queue(Queue), Options), 686 option(max_idle_time(MaxIdle), Options, infinite), 687 thread_repeat_wait(get_work(Queue, Message, MaxIdle)), 688 debug(http(worker), 'Waiting for a job ...', []), 689 debug(http(worker), 'Got job ~p', [Message]), 690 ( Message = quit(Sender) 691 -> !, 692 thread_self(Self), 693 thread_detach(Self), 694 ( Sender == idle 695 -> true 696 ; retract(queue_worker(Queue, Self)), 697 thread_send_message(Sender, quitted(Self)) 698 ) 699 ; open_client(Message, Queue, Goal, In, Out, 700 Options, ClientOptions), 701 ( catch(http_process(Goal, In, Out, ClientOptions), 702 Error, true) 703 -> true 704 ; Error = goal_failed(http_process/4) 705 ), 706 ( var(Error) 707 -> fail 708 ; current_message_level(Error, Level), 709 print_message(Level, Error), 710 memberchk(peer(Peer), ClientOptions), 711 close_connection(Peer, In, Out), 712 fail 713 ) 714 ). 715 716get_work(Queue, Message, infinite) :- 717 !, 718 thread_get_message(Queue, Message). 719get_work(Queue, Message, MaxIdle) :- 720 ( thread_get_message(Queue, Message, [timeout(MaxIdle)]) 721 -> true 722 ; Message = quit(idle) 723 ).
732open_client(requeue(In, Out, Goal, ClOpts), 733 _, Goal, In, Out, Opts, ClOpts) :- 734 !, 735 memberchk(peer(Peer), ClOpts), 736 option(keep_alive_timeout(KeepAliveTMO), Opts, 2), 737 check_keep_alive_connection(In, KeepAliveTMO, Peer, In, Out). 738open_client(Message, Queue, Goal, In, Out, Opts, 739 [ pool(client(Queue, Goal, In, Out)), 740 timeout(Timeout) 741 | Options 742 ]) :- 743 catch(open_client(Message, Goal, In, Out, Options, Opts), 744 E, report_error(E)), 745 option(timeout(Timeout), Opts, 60), 746 ( debugging(http(connection)) 747 -> memberchk(peer(Peer), Options), 748 debug(http(connection), 'Opened connection from ~p', [Peer]) 749 ; true 750 ).
756open_client(Message, Goal, In, Out, ClientOptions, Options) :- 757 open_client_hook(Message, Goal, In, Out, ClientOptions, Options), 758 !. 759open_client(tcp_client(Socket, Goal, Peer), Goal, In, Out, 760 [ peer(Peer), 761 protocol(http) 762 ], _) :- 763 tcp_open_socket(Socket, In, Out). 764 765report_error(E) :- 766 print_message(error, E), 767 fail.
776check_keep_alive_connection(In, TMO, Peer, In, Out) :-
777 stream_property(In, timeout(Old)),
778 set_stream(In, timeout(TMO)),
779 debug(http(keep_alive), 'Waiting for keep-alive ...', []),
780 catch(peek_code(In, Code), E, true),
781 ( var(E), % no exception
782 Code \== -1 % no end-of-file
783 -> set_stream(In, timeout(Old)),
784 debug(http(keep_alive), '\tre-using keep-alive connection', [])
785 ; ( Code == -1
786 -> debug(http(keep_alive), '\tRemote closed keep-alive connection', [])
787 ; debug(http(keep_alive), '\tTimeout on keep-alive connection', [])
788 ),
789 close_connection(Peer, In, Out),
790 fail
791 ).
800done_worker :- 801 thread_self(Self), 802 thread_detach(Self), 803 retract(queue_worker(Queue, Self)), 804 thread_property(Self, status(Status)), 805 !, 806 ( catch(recreate_worker(Status, Queue), _, fail) 807 -> print_message(informational, 808 httpd_restarted_worker(Self)) 809 ; done_status_message_level(Status, Level), 810 print_message(Level, 811 httpd_stopped_worker(Self, Status)) 812 ). 813done_worker :- % received quit(Sender) 814 thread_self(Self), 815 thread_property(Self, status(Status)), 816 done_status_message_level(Status, Level), 817 print_message(Level, 818 httpd_stopped_worker(Self, Status)). 819 820done_status_message_level(true, silent) :- !. 821done_status_message_level(exception('$aborted'), silent) :- !. 822done_status_message_level(_, informational).
The first clause deals with the possibility that we cannot write to
user_error
. This is possible when Prolog is started as a service
using some service managers. Would be nice if we could write an
error, but where?
837recreate_worker(exception(error(io_error(write,user_error),_)), _Queue) :- 838 halt(2). 839recreate_worker(exception(Error), Queue) :- 840 recreate_on_error(Error), 841 queue_options(Queue, Options), 842 atom_concat(Queue, '_', AliasBase), 843 create_workers(1, 1, Queue, AliasBase, Options). 844 845recreate_on_error('$aborted'). 846recreate_on_error(time_limit_exceeded).
855:- multifile 856 message_level/2. 857 858message_level(error(io_error(read, _), _), silent). 859message_level(error(socket_error(epipe,_), _), silent). 860message_level(error(http_write_short(_Obj,_Written), _), silent). 861message_level(error(timeout_error(read, _), _), informational). 862message_level(keep_alive_timeout, silent). 863 864current_message_level(Term, Level) :- 865 ( message_level(Term, Level) 866 -> true 867 ; Level = error 868 ).
876http_requeue(Header) :- 877 requeue_header(Header, ClientOptions), 878 memberchk(pool(client(Queue, Goal, In, Out)), ClientOptions), 879 memberchk(peer(Peer), ClientOptions), 880 http_enough_workers(Queue, keep_alive, Peer), 881 thread_send_message(Queue, requeue(In, Out, Goal, ClientOptions)), 882 !. 883http_requeue(Header) :- 884 debug(http(error), 'Re-queue failed: ~p', [Header]), 885 fail. 886 887requeue_header([], []). 888requeue_header([H|T0], [H|T]) :- 889 requeue_keep(H), 890 !, 891 requeue_header(T0, T). 892requeue_header([_|T0], T) :- 893 requeue_header(T0, T). 894 895requeue_keep(pool(_)). 896requeue_keep(peer(_)). 897requeue_keep(protocol(_)).
904http_process(Goal, In, Out, Options) :- 905 debug(http(server), 'Running server goal ~p on ~p -> ~p', 906 [Goal, In, Out]), 907 option(timeout(TMO), Options, 60), 908 set_stream(In, timeout(TMO)), 909 set_stream(Out, timeout(TMO)), 910 http_wrapper(Goal, In, Out, Connection, 911 [ request(Request) 912 | Options 913 ]), 914 next(Connection, Request). 915 916next(Connection, Request) :- 917 next_(Connection, Request), !. 918next(Connection, Request) :- 919 print_message(warning, goal_failed(next(Connection,Request))). 920 921next_(switch_protocol(SwitchGoal, _SwitchOptions), Request) :- 922 !, 923 memberchk(pool(client(_Queue, _Goal, In, Out)), Request), 924 ( catch(call(SwitchGoal, In, Out), E, 925 ( print_message(error, E), 926 fail)) 927 -> true 928 ; http_close_connection(Request) 929 ). 930next_(spawned(ThreadId), _) :- 931 !, 932 debug(http(spawn), 'Handler spawned to thread ~w', [ThreadId]). 933next_(Connection, Request) :- 934 downcase_atom(Connection, 'keep-alive'), 935 http_requeue(Request), 936 !. 937next_(_, Request) :- 938 http_close_connection(Request).
945http_close_connection(Request) :-
946 memberchk(pool(client(_Queue, _Goal, In, Out)), Request),
947 memberchk(peer(Peer), Request),
948 close_connection(Peer, In, Out).
955close_connection(Peer, In, Out) :-
956 debug(http(connection), 'Closing connection from ~p', [Peer]),
957 catch(close(In, [force(true)]), _, true),
958 catch(close(Out, [force(true)]), _, true).
If a pool does not exist, this predicate calls the multifile hook create_pool/1 to create it. If this predicate succeeds the operation is retried.
976http_spawn(Goal, Options) :- 977 select_option(pool(Pool), Options, ThreadOptions), 978 !, 979 current_output(CGI), 980 catch(thread_create_in_pool(Pool, 981 wrap_spawned(CGI, Goal), Id, 982 [ detached(true) 983 | ThreadOptions 984 ]), 985 Error, 986 true), 987 ( var(Error) 988 -> http_spawned(Id) 989 ; Error = error(resource_error(threads_in_pool(_)), _) 990 -> throw(http_reply(busy)) 991 ; Error = error(existence_error(thread_pool, Pool), _), 992 create_pool(Pool) 993 -> http_spawn(Goal, Options) 994 ; throw(Error) 995 ). 996http_spawn(Goal, Options) :- 997 current_output(CGI), 998 thread_create(wrap_spawned(CGI, Goal), Id, 999 [ detached(true) 1000 | Options 1001 ]), 1002 http_spawned(Id). 1003 1004wrap_spawned(CGI, Goal) :- 1005 set_output(CGI), 1006 http_wrap_spawned(Goal, Request, Connection), 1007 next(Connection, Request).
1017create_pool(Pool) :- 1018 E = error(permission_error(create, thread_pool, Pool), _), 1019 catch(http:create_pool(Pool), E, true). 1020create_pool(Pool) :- 1021 print_message(informational, httpd(created_pool(Pool))), 1022 thread_pool_create(Pool, 10, []). 1023 1024 1025 /******************************* 1026 * WAIT POLICIES * 1027 *******************************/ 1028 1029:- meta_predicate 1030 thread_repeat_wait( ).
repeat, thread_idle(Goal)
, choosing whether to use a
long
or short
idle time based on the average firing rate.1037thread_repeat_wait(Goal) :- 1038 new_rate_mma(5, 1000, State), 1039 repeat, 1040 update_rate_mma(State, MMA), 1041 long(MMA, IsLong), 1042 ( IsLong == brief 1043 -> call(Goal) 1044 ; thread_idle(Goal, IsLong) 1045 ). 1046 1047long(MMA, brief) :- 1048 MMA < 0.05, 1049 !. 1050long(MMA, short) :- 1051 MMA < 1, 1052 !. 1053long(_, long).
1067new_rate_mma(N, Resolution, rstate(Base, 0, MaxI, Resolution, N, 0)) :- 1068 current_prolog_flag(max_tagged_integer, MaxI), 1069 get_time(Base). 1070 1071update_rate_mma(State, MMAr) :- 1072 State = rstate(Base, Last, MaxI, Resolution, N, MMA0), 1073 get_time(Now), 1074 Stamp is round((Now-Base)*Resolution), 1075 ( Stamp > MaxI 1076 -> nb_setarg(1, State, Now), 1077 nb_setarg(2, State, 0) 1078 ; true 1079 ), 1080 Diff is Stamp-Last, 1081 nb_setarg(2, State, Stamp), 1082 MMA is round(((N-1)*MMA0+Diff)/N), 1083 nb_setarg(6, State, MMA), 1084 MMAr is MMA/float(Resolution). 1085 1086 1087 /******************************* 1088 * MESSAGES * 1089 *******************************/ 1090 1091:- multifile 1092 prolog:message/3. 1093 1094prologmessage(httpd_started_server(Port, Options)) --> 1095 [ 'Started server at '-[] ], 1096 http_root(Port, Options). 1097prologmessage(httpd_stopped_worker(Self, Status)) --> 1098 [ 'Stopped worker ~p: ~p'-[Self, Status] ]. 1099prologmessage(httpd_restarted_worker(Self)) --> 1100 [ 'Replaced aborted worker ~p'-[Self] ]. 1101prologmessage(httpd(created_pool(Pool))) --> 1102 [ 'Created thread-pool ~p of size 10'-[Pool], nl, 1103 'Create this pool at startup-time or define the hook ', nl, 1104 'http:create_pool/1 to avoid this message and create a ', nl, 1105 'pool that fits the usage-profile.' 1106 ]. 1107 1108http_root(Address, Options) --> 1109 { landing_page(Address, URI, Options) }, 1110 [ '~w'-[URI] ]. 1111 1112landing_page(Host:Port, URI, Options) :- 1113 !, 1114 must_be(atom, Host), 1115 must_be(integer, Port), 1116 http_server_property(Port, scheme(Scheme)), 1117 ( default_port(Scheme, Port) 1118 -> format(atom(Base), '~w://~w', [Scheme, Host]) 1119 ; format(atom(Base), '~w://~w:~w', [Scheme, Host, Port]) 1120 ), 1121 entry_page(Base, URI, Options). 1122landing_page(unix_socket(Path), URI, _Options) :- 1123 !, 1124 format(string(URI), 'Unix domain socket "~w"', [Path]). 1125landing_page(Port, URI, Options) :- 1126 landing_page(localhost:Port, URI, Options). 1127 1128default_port(http, 80). 1129default_port(https, 443). 1130 1131entry_page(Base, URI, Options) :- 1132 option(entry_page(Entry), Options), 1133 !, 1134 uri_resolve(Entry, Base, URI). 1135entry_page(Base, URI, _) :- 1136 http_absolute_location(root(.), Entry, []), 1137 uri_resolve(Entry, Base, URI)
Threaded HTTP server
Most code doesn't need to use this directly; instead use library(http/http_server), which combines this library with the typical HTTP libraries that most servers need.
This library defines the HTTP server frontend of choice for SWI-Prolog. It is based on the multi-threading capabilities of SWI-Prolog and thus exploits multiple cores to serve requests concurrently. The server scales well and can cooperate with library(thread_pool) to control the number of concurrent requests of a given type. For example, it can be configured to handle 200 file download requests concurrently, 2 requests that potentially uses a lot of memory and 8 requests that use a lot of CPU resources.
On Unix systems, this library can be combined with library(http/http_unix_daemon) to realise a proper Unix service process that creates a web server at port 80, runs under a specific account, optionally detaches from the controlling terminal, etc.
Combined with library(http/http_ssl_plugin) from the SSL package, this library can be used to create an HTTPS server. See <plbase>/doc/packages/examples/ssl/https for an example server using a self-signed SSL certificate. */