1/* Part of SWI-Prolog 2 3 Author: Mike Elston 4 Matt Lilley 5 E-mail: matt.s.lilley@gmail.com 6 WWW: http://www.swi-prolog.org 7 Copyright (c) 2014, Mike Elston, Matt Lilley 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36/* PostgreSQL is a trademark of the PostgreSQL Global Development Group. 37 Microsoft, SQL Server, and Windows are either registered trademarks or 38 trademarks of Microsoft Corporation in the United States and/or other 39 countries. SQLite is a registered trademark of Hipp, Wyrick & Company, 40 Inc in the United States. All other trademarks or registered trademarks 41 are the property of their respective owners. 42*/ 43 44:-module(cql_database, 45 [get_transaction_context/5, 46 odbc_execute_with_statement_cache/7, 47 save_database_event/6, 48 application_value_to_odbc_value/7, 49 odbc_value_to_application_value/5, 50 cql_transaction/3, 51 database_transaction_query_info/3, 52 current_transaction_id/1, 53 transaction_active/0, 54 register_database_connection_details/2, 55 resolve_deadlock/1, 56 database_connection_details/2, 57 odbc_connection_call/3, 58 update_history/14, 59 odbc_cleanup_and_disconnect/1]). 60 61:-use_module(library(cql/cql)). 62:-use_module(library(debug)). 63 64:-dynamic 65 database_connection_details/2. 66:-volatile 67 database_connection_details/2. 68 69:-thread_local 70 database_event/6, 71 transaction_active/0, 72 transaction_context/4, 73 database_transaction_query_info/3. 74 75get_transaction_context(TransactionId, TrxId, AccessToken, TransactionTimestamp, Connection) :- 76 ( transaction_context(TransactionId_, AccessToken_, TransactionTimestamp_, Connection_) -> 77 TransactionId = TransactionId_, 78 TrxId = {null}, 79 AccessToken = AccessToken_, 80 TransactionTimestamp = TransactionTimestamp_, 81 Connection = Connection_ 82 83 ; otherwise -> 84 throw(no_database_transaction_active) 85 ). 86 87 88:-meta_predicate 89 odbc_connection_call( , , ). 90 91:-thread_local 92 % odbc_connection_available(Schema, Connection) 93 odbc_connection_available/2, 94 % odbc_connection_in_use(Schema) 95 odbc_connection_in_use/1. 96 97:-multifile(cql_max_db_connections_hook/1). 98:-multifile(cql:odbc_connection_complete_hook/3). 99odbc_connection_call(Schema, Connection, Goal) :- 100 ( retract(odbc_connection_available(Schema, Connection)) -> % Get a connection from the pool 101 assert(odbc_connection_in_use(Schema)), 102 setup_call_cleanup(true, 103 Goal, 104 ( odbc_end_transaction(Connection, rollback), % Ensure connections in the pool have no pending results dangling 105 retract(odbc_connection_in_use(Schema)), 106 assert(odbc_connection_available(Schema, Connection)))) % Put connection back in the pool 107 ; aggregate_all(r(count), odbc_connection_in_use(Schema), r(N)), 108 ( cql_max_db_connections_hook(MaxDbConnections)-> 109 true 110 ; otherwise-> 111 MaxDbConnections = 10 112 ), 113 N >= MaxDbConnections -> 114 thread_self(ThreadId), 115 116 cql_error(too_many_schema_connections, 'Too many connections on ~w: Maximum is ~w', [ThreadId, MaxDbConnections]) 117 118 ; database_connection_details(Schema, ConnectionDetails) -> 119 ( ConnectionDetails = driver_string(DriverString) -> 120 true 121 122 ; ConnectionDetails = dsn(Dsn, Username, Password) -> 123 gethostname(HostName), 124 format(atom(DriverString), 'DSN=~w;UID=~w;PWD=~w;WSID=~w;', [Dsn, Username, Password, HostName]) 125 126 ; ConnectionDetails = dsn(Dsn) -> 127 gethostname(HostName), 128 format(atom(DriverString), 'DSN=~w;WSID=~w;', [Dsn, HostName]) 129 130 ; otherwise -> 131 throw(invalid_connection_details(ConnectionDetails)) 132 ), 133 134 odbc_connect(-, 135 Connection, 136 [driver_string(DriverString), 137 silent(true), 138 null({null}), 139 auto_commit(false), 140 wide_column_threshold(8000), 141 mars(true)]), % In theory this is not needed following bug-5181 but see comment in predicate description 142 143 thread_at_exit(odbc_cleanup_and_disconnect(Connection)), 144 assert(odbc_connection_available(Schema, Connection)), 145 146 ignore(cql:odbc_connection_complete_hook(Schema, ConnectionDetails, Connection)), 147 odbc_connection_call(Schema, Connection, Goal) 148 149 ; otherwise -> 150 throw(no_database_connection_details) 151 ).
To avoid leaks, all exiting threads with database connections should call this. See odbc_connection_call/2 (thread_at_exit/1)
Note that any exception inside odbc_cleanup_and_disconnect/1 will result in it not going on to the next step.
We log exceptions to the event log because exceptions at this level are associated with the server process crashing and the SE log is unlikely to capture anything useful.
168odbc_cleanup_and_disconnect(Connection) :- 169 catch_all(odbc_cleanup_and_disconnect_1(Connection), 170 E, 171 ( thread_self(ThreadId), 172 cql_log([], error, '[~w] odbc_cleanup_and_disconnect/1 : ~w', [ThreadId, E]))). 173 174odbc_cleanup_and_disconnect_1(Connection) :- 175 thread_self(ThreadId), 176 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), 177 odbc_end_transaction(Connection, rollback), 178 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), 179 forall(retract(cached_prepared_odbc_statement(_, _, Connection, _, _, CachedStatement, _)), 180 ( debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]), 181 odbc_free_statement(CachedStatement), 182 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]) 183 ) 184 ), 185 retractall(lru_key(_)), 186 retractall(lru_statement(_)), 187 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), 188 odbc_disconnect(Connection), 189 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), 190 % Get rid of these last so there is some evidence if odbc_disconnect/1 does not work 191 retractall(sql_server_spid(Connection, _, _, _)).
195:-thread_local 196 % cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId) 197 % Note that we need OdbcParameterDataTypes in here so we can look up the correct statement. Consider: 198 % "SELECT * FROM some_table WHERE some_column = ?" 199 % This can be compiled with varchar(4) as the datatype, to get statement S, 200 % If we then want to do a query where user_id = 'ERIC' we are going to get a runtime type error. 201 % Ordinarily this isn't a problem because the domain is well-established at compile-time, but this 202 % is not the case when dealing with dynamic tables, specifically #cql_in. 203 cached_prepared_odbc_statement/7. 204 205:-thread_local 206 lru_statement/1, 207 statement_locked/1, 208 lru_key/1. 209 210max_lru_size(4000). 211 212% This is called with the current statement locked 213evict_cache_entries(_, 0):- !. 214evict_cache_entries(Key, N):- 215 N > 0, 216 retract(lru_statement(MutexId)), 217 % This statement cannot be locked unless the cache size is extremely small, since we JUST cycled the current statement to the bottom of the stack 218 ( statement_locked(MutexId)-> 219 % Just do nothing in this case. We will get it next time. Besides, it is very unlikely to happen 220 true 221 ; otherwise-> 222 thread_self(ThreadId), 223 retract(cached_prepared_odbc_statement(Sql, _, _, _, _, Statement, MutexId)), 224 odbc_free_statement(Statement), 225 debug(odbc_statement_cache, 'CACHE-EVICT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), 226 flag(Key, X, X-1) 227 ), 228 NN is N-1, 229 !, 230 evict_cache_entries(Key, NN). 231 232 233odbc_execute_with_statement_cache(Connection, _, _, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- 234 cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, _, _, Statement, MutexId), 235 !, 236 setup_call_cleanup(assert(statement_locked(MutexId)), 237 ( thread_self(ThreadId), 238 retract(lru_statement(MutexId)), 239 assertz(lru_statement(MutexId)), 240 debug(odbc_statement_cache, 'CACHE-HIT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), 241 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) 242 ), 243 retract(statement_locked(MutexId))). 244 245odbc_execute_with_statement_cache(Connection, FileName, LineNumber, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- 246 thread_self(ThreadId), 247 debug(odbc_statement_cache, 'CACHE-MISS [~w] : ~@', [ThreadId, trimmed_sql(Sql, 80)]), 248 odbc_prepare(Connection, Sql, OdbcParameterDataTypes, Statement, []), 249 gensym(statement_lock_, MutexId), 250 ( lru_key(Key)-> 251 true 252 ; otherwise-> 253 gensym(lru_key_, Key), 254 assert(lru_key(Key)) 255 ), 256 setup_call_cleanup(assert(statement_locked(MutexId)), 257 ( assertz(cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId)), 258 assertz(lru_statement(MutexId)), 259 max_lru_size(MaxSize), 260 flag(Key, CacheSize, CacheSize+1), 261 ( CacheSize >= MaxSize-> 262 Delta is CacheSize - MaxSize, 263 evict_cache_entries(Key, Delta) 264 ; otherwise-> 265 true 266 ), 267 flag(Key, Z, Z), 268 debug(odbc_statement_cache, 'CACHE-STORE [~w] ~w, ~w : ~@', [ThreadId, Statement, MutexId, trimmed_sql(Sql, 60)]), 269 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) 270 ), 271 retract(statement_locked(MutexId))).
284save_database_event(AccessToken, % + 285 EventType, % + 286 Schema, % + 287 TableName, % + 288 PrimaryKeyColumnName, % + 289 PrimaryKey) :- % + 290 ( database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey) -> 291 % No point storing an event more than once 292 true 293 ; otherwise-> 294 assert(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)) 295 ). 296 297 298:-meta_predicate(cql_transaction( , , )). 299 300cql_transaction(Schema, AccessToken, Goal):- 301 thread_self(ThreadId), 302 setup_call_cleanup(assert(transaction_active), 303 cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet), 304 ( retractall(database_transaction_query_info(ThreadId, _, _)), 305 retractall(transaction_context(_, _, _, _)), 306 retractall(database_event(_, _, _, _, _, _)), 307 flag(transaction_count, Count, Count+1), 308 retractall(transaction_active))), % Removed last so if transaction_active succeeds while executing Goal then the other facts are still available to Goal 309 cql_process_database_events(DatabaseEventsSet). 310 311cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet):- 312 ( transaction_context(ExistingTransactionId, _, _, _) -> 313 throw(database_transaction_already_in_progress(ExistingTransactionId)) 314 ; otherwise -> 315 true 316 ), 317 resolve_deadlock(cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet)). 318 319cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet) :- 320 odbc_connection_call(Schema, 321 Connection, 322 ( ( dbms(Schema, 'Microsoft SQL Server')-> 323 odbc_query(Connection, 'SELECT CONVERT(VARCHAR(36), NEWID())', row(TransactionId)) 324 ; dbms(Schema, 'PostgreSQL') -> 325 odbc_query(Connection, 'SELECT uuid_generate_v1()', row(TransactionId)) 326 ; dbms(Schema, 'SQLite') -> 327 odbc_query(Connection, 'SELECT substr(u,1,8)||\'-\'||substr(u,9,4)||\'-4\'||substr(u,13,3)||\'-\'||v||substr(u,17,3)||\'-\'||substr(u,21,12) from (select lower(hex(randomblob(16))) as u, substr(\'89ab\',abs(random()) % 4 + 1, 1) as v)', row(TransactionId)) 328 ; otherwise -> 329 throw(no_dbms_for_schema(Schema)) 330 ), 331 dbms(Schema, DBMS), 332 store_transaction_info(AccessToken, Connection, DBMS, Goal), 333 get_time(ExecutionTime), 334 assert(transaction_context(TransactionId, AccessToken, ExecutionTime, Connection)), 335 336 ( cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) -> 337 true 338 ; otherwise -> 339 % odbc_connection_call/3 always rolls back so no need for explicit rollback here 340 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_logic_failure), 341 fail 342 ))). 343 344 345:-meta_predicate 346 cql_transaction_3( , , , , ). 347 348cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) :- 349 log_transaction_state(AccessToken, TransactionId, transaction_starting), 350 catch(Goal, E, Error = E), 351 % Note that this previously did a setof/3. This reorders events, which breaks event consolidation 352 findall(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey), 353 retract(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)), 354 DatabaseEvents), 355 % list_to_set/2 is NlogN and preserves order 356 list_to_set(DatabaseEvents, DatabaseEventsSet), 357 ( var(Error) -> 358 odbc_end_transaction(Connection, commit), 359 log_transaction_state(AccessToken, TransactionId, transaction_committed) 360 361 ; otherwise -> 362 % odbc_connection_call/3 always rolls back so no need for explicit rollback here 363 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_error), 364 throw(Error) 365 ).
Use this only when you are sure Goal has no non-database side effects (assert/retract, file operations etc)
Originally developed for use inside cql_transaction/3, resolve_deadlock/1 can also be used to ensure non-transactional operations can resolve deadlocks.
381:-meta_predicate 382 resolve_deadlock( ). 383 384resolve_deadlock(Goal) :- 385 thread_self(ThreadId), 386 flag(transaction_count, InitialCount, InitialCount), 387 388 maximum_deadlock_retries(MaximumDeadlockRetries), 389 between(1, MaximumDeadlockRetries, RetryCount), % BTP for deadlock retry 390 391 ( RetryCount >= MaximumDeadlockRetries -> 392 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_FAILED\tCOULD NOT RESOLVE deadlock on thread \'~w\'. Goal: ~w', [ThreadId, Goal]), 393 throw(deadlock_retry_count_exceeded(MaximumDeadlockRetries)) 394 395 ; RetryCount > 1 -> 396 % Check if another transaction has completed. Note complete means committed -or- rolled back 397 flag(transaction_count, CurrentCount, CurrentCount), 398 ( CurrentCount =:= InitialCount -> 399 Flag = no_other_transaction_completed 400 ; otherwise -> 401 Flag = another_transaction_completed 402 ) 403 404 ; otherwise -> 405 Flag = no_deadlock 406 ), 407 408 ( Flag == no_other_transaction_completed -> 409 Delay is ( 2 << RetryCount) / 1000.0, % Exponential backoff up to 2.048s 410 sleep(Delay), 411 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\'(attempt ~w). Initiated by EXPIRY of RANDOM WAIT of ~w seconds.', [ThreadId, RetryCount, Delay]) 412 413 ; Flag == another_transaction_completed -> 414 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\' (attempt ~w). Initiated by COMPLETION of a TRANSACTION on another thread.', [ThreadId, RetryCount]) 415 ; otherwise -> 416 true 417 ), 418 419 catch_all((Goal -> 420 LogicalStatus = 1 421 ; otherwise -> 422 true 423 ), 424 error(odbc('40001', _, _), _), 425 ( cql_log([debug(deadlocks)], warning, 'DEADLOCK_DETECTED\tThread \'~w\' selected as DEADLOCK VICTIM. Goal: ~w', [ThreadId, Goal]), 426 retractall(database_transaction_query_info(ThreadId, _, _)), 427 retractall(transaction_context(_, _, _, _)), 428 retractall(database_event(_, _, _, _, _, _)), 429 fail)), 430 ( RetryCount > 1 -> 431 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLVED\tdeadlocked transaction on thread \'~w\' RESOLVED (attempt ~w).', [ThreadId, RetryCount]) 432 433 ; otherwise -> 434 true 435 ), 436 !, % Don't want to backtrack into the deadlock retry between/3 when Goal fails 437 LogicalStatus == 1.
444maximum_deadlock_retries(10). 445 446% log_transaction_state(+AccessToken, +TransactionId, +TransactionState) 447 448log_transaction_state(AccessToken, TransactionId, TransactionState) :- 449 cql_access_token_to_user_id(AccessToken, UserId), 450 upcase_atom(TransactionState, TransactionStateUc), 451 cql_log([], informational, '\t~p\t~p\t~p', [UserId, TransactionId, TransactionStateUc]).
460register_database_connection_details(Schema, ConnectionDetails) :- 461 assert(database_connection_details(Schema, ConnectionDetails)). 462 463 464update_history(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal):- 465 ignore(cql_update_history_hook(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal)).
470:-multifile(cql:application_value_to_odbc_value_hook/7). 471application_value_to_odbc_value(ApplicationValue, OdbcDataType, Schema, TableName, ColumnName, Qualifiers, OdbcValue):- 472 ( var(ApplicationValue)-> 473 throw(instantiation_error(ApplicationValue)) 474 ; cql:application_value_to_odbc_value_hook(OdbcDataType, Schema, TableName, ColumnName, Qualifiers, ApplicationValue, OdbcValue)-> 475 true 476 ; otherwise-> 477 OdbcValue = ApplicationValue 478 ). 479 480 481odbc_numeric_precision_limit(27).
485:-multifile(cql:odbc_value_to_application_value_hook/7). 486odbc_value_to_application_value(Schema, TableSpec, ColumnName, OdbcValue, ApplicationValue):- 487 cql_data_type(Schema, TableSpec, ColumnName, DatabaseDataType, _, _, _, Domain, _, _), 488 !, 489 ( cql:odbc_value_to_application_value_hook(DatabaseDataType, Schema, TableSpec, ColumnName, Domain, OdbcValue, ApplicationValue)-> 490 true 491 ; otherwise-> 492 ApplicationValue = OdbcValue 493 ). 494 495% FIXME: What to do about this? 496catch_all(A, B, C):- catch(A, B, C). 497 498 499:-multifile(cql:process_database_events/1). 500cql_process_database_events(Events):- 501 ignore(cql:process_database_events(Events)). 502 503:-multifile(cql:cql_transaction_info_hook/5). 504store_transaction_info(AccessToken, Connection, DBMS, Goal):- 505 ( cql:cql_transaction_info_hook(AccessToken, Connection, DBMS, Goal, Info)-> 506 true 507 ; otherwise-> 508 Info = {null} 509 ), 510 thread_self(ThreadId), 511 assert(database_transaction_query_info(ThreadId, Goal, Info)).
515current_transaction_id(TransactionId):-
516 transaction_context(TransactionId, _, _, _)