1/* Part of SWI-Prolog 2 3 Author: Mike Elston 4 Matt Lilley 5 E-mail: matt.s.lilley@gmail.com 6 WWW: http://www.swi-prolog.org 7 Copyright (c) 2014, Mike Elston, Matt Lilley 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36/* PostgreSQL is a trademark of the PostgreSQL Global Development Group. 37 Microsoft, SQL Server, and Windows are either registered trademarks or 38 trademarks of Microsoft Corporation in the United States and/or other 39 countries. SQLite is a registered trademark of Hipp, Wyrick & Company, 40 Inc in the United States. All other trademarks or registered trademarks 41 are the property of their respective owners. 42*/ 43 44:-module(cql_database, 45 [get_transaction_context/5, 46 odbc_execute_with_statement_cache/7, 47 save_database_event/6, 48 application_value_to_odbc_value/7, 49 odbc_value_to_application_value/5, 50 cql_transaction/3, 51 database_transaction_query_info/3, 52 current_transaction_id/1, 53 transaction_active/0, 54 register_database_connection_details/2, 55 resolve_deadlock/1, 56 database_connection_details/2, 57 odbc_connection_call/3, 58 update_history/14, 59 odbc_cleanup_and_disconnect/1]). 60 61:-use_module(library(cql/cql)). 62:-use_module(library(debug)). 63 64:-dynamic 65 database_connection_details/2. 66:-volatile 67 database_connection_details/2. 68 69:-thread_local 70 database_event/6, 71 transaction_active/0, 72 transaction_context/4, 73 database_transaction_query_info/3. 74 75get_transaction_context(TransactionId, TrxId, AccessToken, TransactionTimestamp, Connection) :- 76 ( transaction_context(TransactionId_, AccessToken_, TransactionTimestamp_, Connection_) -> 77 TransactionId = TransactionId_, 78 TrxId = {null}, 79 AccessToken = AccessToken_, 80 TransactionTimestamp = TransactionTimestamp_, 81 Connection = Connection_ 82 83 ; otherwise -> 84 throw(no_database_transaction_active) 85 ). 86 87 88:-meta_predicate 89 odbc_connection_call( , , ). 90 91:-thread_local 92 % odbc_connection_available(Schema, Connection) 93 odbc_connection_available/2, 94 % odbc_connection_in_use(Schema) 95 odbc_connection_in_use/1. 96 97:-multifile(cql_max_db_connections_hook/1). 98:-multifile(cql:odbc_connection_complete_hook/3). 99odbc_connection_call(Schema, Connection, Goal) :- 100 ( retract(odbc_connection_available(Schema, Connection)) -> % Get a connection from the pool 101 assert(odbc_connection_in_use(Schema)), 102 setup_call_cleanup(true, 103 Goal, 104 ( odbc_end_transaction(Connection, rollback), % Ensure connections in the pool have no pending results dangling 105 retract(odbc_connection_in_use(Schema)), 106 assert(odbc_connection_available(Schema, Connection)))) % Put connection back in the pool 107 ; aggregate_all(r(count), odbc_connection_in_use(Schema), r(N)), 108 ( cql_max_db_connections_hook(MaxDbConnections)-> 109 true 110 ; otherwise-> 111 MaxDbConnections = 10 112 ), 113 N >= MaxDbConnections -> 114 thread_self(ThreadId), 115 116 cql_error(too_many_schema_connections, 'Too many connections on ~w: Maximum is ~w', [ThreadId, MaxDbConnections]) 117 118 ; database_connection_details(Schema, ConnectionDetails) -> 119 ( ConnectionDetails = driver_string(DriverString) -> 120 true 121 122 ; ConnectionDetails = dsn(Dsn, Username, Password) -> 123 gethostname(HostName), 124 format(atom(DriverString), 'DSN=~w;UID=~w;PWD=~w;WSID=~w;', [Dsn, Username, Password, HostName]) 125 126 ; ConnectionDetails = dsn(Dsn) -> 127 gethostname(HostName), 128 format(atom(DriverString), 'DSN=~w;WSID=~w;', [Dsn, HostName]) 129 130 ; otherwise -> 131 throw(invalid_connection_details(ConnectionDetails)) 132 ), 133 134 odbc_connect(-, 135 Connection, 136 [driver_string(DriverString), 137 silent(true), 138 null({null}), 139 auto_commit(false), 140 wide_column_threshold(8000), 141 mars(true)]), % In theory this is not needed following bug-5181 but see comment in predicate description 142 143 thread_at_exit(odbc_cleanup_and_disconnect(Connection)), 144 assert(odbc_connection_available(Schema, Connection)), 145 146 ignore(cql:odbc_connection_complete_hook(Schema, ConnectionDetails, Connection)), 147 odbc_connection_call(Schema, Connection, Goal) 148 149 ; otherwise -> 150 throw(no_database_connection_details) 151 ).
To avoid leaks, all exiting threads with database connections should call this. See odbc_connection_call/2 (thread_at_exit/1)
Note that any exception inside odbc_cleanup_and_disconnect/1 will result in it not going on to the next step.
We log exceptions to the event log because exceptions at this level are associated with the server process crashing and the SE log is unlikely to capture anything useful.
168odbc_cleanup_and_disconnect(Connection) :- 169 catch_all(odbc_cleanup_and_disconnect_1(Connection), 170 E, 171 ( thread_self(ThreadId), 172 cql_log([], error, '[~w] odbc_cleanup_and_disconnect/1 : ~w', [ThreadId, E]))). 173 174odbc_cleanup_and_disconnect_1(Connection) :- 175 thread_self(ThreadId), 176 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), 177 odbc_end_transaction(Connection, rollback), 178 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), 179 forall(retract(cached_prepared_odbc_statement(_, _, Connection, _, _, CachedStatement, _)), 180 ( debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]), 181 odbc_free_statement(CachedStatement), 182 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]) 183 ) 184 ), 185 retractall(lru_key(_)), 186 retractall(lru_statement(_)), 187 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), 188 odbc_disconnect(Connection), 189 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), 190 % Get rid of these last so there is some evidence if odbc_disconnect/1 does not work 191 retractall(sql_server_spid(Connection, _, _, _)).
195:-thread_local 196 % cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId) 197 % Note that we need OdbcParameterDataTypes in here so we can look up the correct statement. Consider: 198 % "SELECT * FROM some_table WHERE some_column = ?" 199 % This can be compiled with varchar(4) as the datatype, to get statement S, 200 % If we then want to do a query where user_id = 'ERIC' we are going to get a runtime type error. 201 % Ordinarily this isn't a problem because the domain is well-established at compile-time, but this 202 % is not the case when dealing with dynamic tables, specifically #cql_in. 203 cached_prepared_odbc_statement/7. 204 205:-thread_local 206 lru_statement/1, 207 statement_locked/1, 208 lru_key/1. 209 210max_lru_size(4000). 211 212% This is called with the current statement locked 213evict_cache_entries(_, 0):- !. 214evict_cache_entries(Key, N):- 215 N > 0, 216 retract(lru_statement(MutexId)), 217 % This statement cannot be locked unless the cache size is extremely small, since we JUST cycled the current statement to the bottom of the stack 218 ( statement_locked(MutexId)-> 219 % Just do nothing in this case. We will get it next time. Besides, it is very unlikely to happen 220 true 221 ; otherwise-> 222 thread_self(_0ThreadId), 223 retract(cached_prepared_odbc_statement(_0Sql, _, _, _, _, Statement, MutexId)), 224 odbc_free_statement(Statement), 225 debug(odbc_statement_cache, 'CACHE-EVICT [~w] ~w : ~@', 226 [_0ThreadId, Statement, trimmed_sql(_0Sql, 80)]), 227 flag(Key, X, X-1) 228 ), 229 NN is N-1, 230 !, 231 evict_cache_entries(Key, NN). 232 233 234odbc_execute_with_statement_cache(Connection, _, _, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- 235 cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, _, _, Statement, MutexId), 236 !, 237 setup_call_cleanup(assert(statement_locked(MutexId)), 238 ( thread_self(ThreadId), 239 retract(lru_statement(MutexId)), 240 assertz(lru_statement(MutexId)), 241 debug(odbc_statement_cache, 'CACHE-HIT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), 242 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) 243 ), 244 retract(statement_locked(MutexId))). 245 246odbc_execute_with_statement_cache(Connection, FileName, LineNumber, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- 247 thread_self(ThreadId), 248 debug(odbc_statement_cache, 'CACHE-MISS [~w] : ~@', [ThreadId, trimmed_sql(Sql, 80)]), 249 odbc_prepare(Connection, Sql, OdbcParameterDataTypes, Statement, []), 250 gensym(statement_lock_, MutexId), 251 ( lru_key(Key)-> 252 true 253 ; otherwise-> 254 gensym(lru_key_, Key), 255 assert(lru_key(Key)) 256 ), 257 setup_call_cleanup(assert(statement_locked(MutexId)), 258 ( assertz(cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId)), 259 assertz(lru_statement(MutexId)), 260 max_lru_size(MaxSize), 261 flag(Key, CacheSize, CacheSize+1), 262 ( CacheSize >= MaxSize-> 263 Delta is CacheSize - MaxSize, 264 evict_cache_entries(Key, Delta) 265 ; otherwise-> 266 true 267 ), 268 flag(Key, Z, Z), 269 debug(odbc_statement_cache, 'CACHE-STORE [~w] ~w, ~w : ~@', [ThreadId, Statement, MutexId, trimmed_sql(Sql, 60)]), 270 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) 271 ), 272 retract(statement_locked(MutexId))).
285save_database_event(AccessToken, % + 286 EventType, % + 287 Schema, % + 288 TableName, % + 289 PrimaryKeyColumnName, % + 290 PrimaryKey) :- % + 291 ( database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey) -> 292 % No point storing an event more than once 293 true 294 ; otherwise-> 295 assert(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)) 296 ). 297 298 299:-meta_predicate(cql_transaction( , , )). 300 301cql_transaction(Schema, AccessToken, Goal):- 302 thread_self(ThreadId), 303 setup_call_cleanup(assert(transaction_active), 304 cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet), 305 ( retractall(database_transaction_query_info(ThreadId, _, _)), 306 retractall(transaction_context(_, _, _, _)), 307 retractall(database_event(_, _, _, _, _, _)), 308 flag(transaction_count, Count, Count+1), 309 retractall(transaction_active))), % Removed last so if transaction_active succeeds while executing Goal then the other facts are still available to Goal 310 cql_process_database_events(DatabaseEventsSet). 311 312cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet):- 313 ( transaction_context(ExistingTransactionId, _, _, _) -> 314 throw(database_transaction_already_in_progress(ExistingTransactionId)) 315 ; otherwise -> 316 true 317 ), 318 resolve_deadlock(cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet)). 319 320cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet) :- 321 odbc_connection_call(Schema, 322 Connection, 323 ( ( dbms(Schema, 'Microsoft SQL Server')-> 324 odbc_query(Connection, 'SELECT CONVERT(VARCHAR(36), NEWID())', row(TransactionId)) 325 ; dbms(Schema, 'PostgreSQL') -> 326 odbc_query(Connection, 'SELECT uuid_generate_v1()', row(TransactionId)) 327 ; dbms(Schema, 'SQLite') -> 328 odbc_query(Connection, 'SELECT substr(u,1,8)||\'-\'||substr(u,9,4)||\'-4\'||substr(u,13,3)||\'-\'||v||substr(u,17,3)||\'-\'||substr(u,21,12) from (select lower(hex(randomblob(16))) as u, substr(\'89ab\',abs(random()) % 4 + 1, 1) as v)', row(TransactionId)) 329 ; otherwise -> 330 throw(no_dbms_for_schema(Schema)) 331 ), 332 dbms(Schema, DBMS), 333 store_transaction_info(AccessToken, Connection, DBMS, Goal), 334 get_time(ExecutionTime), 335 assert(transaction_context(TransactionId, AccessToken, ExecutionTime, Connection)), 336 337 ( cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) -> 338 true 339 ; otherwise -> 340 % odbc_connection_call/3 always rolls back so no need for explicit rollback here 341 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_logic_failure), 342 fail 343 ))). 344 345 346:-meta_predicate 347 cql_transaction_3( , , , , ). 348 349cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) :- 350 log_transaction_state(AccessToken, TransactionId, transaction_starting), 351 catch(Goal, E, Error = E), 352 % Note that this previously did a setof/3. This reorders events, which breaks event consolidation 353 findall(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey), 354 retract(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)), 355 DatabaseEvents), 356 % list_to_set/2 is NlogN and preserves order 357 list_to_set(DatabaseEvents, DatabaseEventsSet), 358 ( var(Error) -> 359 odbc_end_transaction(Connection, commit), 360 log_transaction_state(AccessToken, TransactionId, transaction_committed) 361 362 ; otherwise -> 363 % odbc_connection_call/3 always rolls back so no need for explicit rollback here 364 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_error), 365 throw(Error) 366 ).
Use this only when you are sure Goal has no non-database side effects (assert/retract, file operations etc)
Originally developed for use inside cql_transaction/3, resolve_deadlock/1 can also be used to ensure non-transactional operations can resolve deadlocks.
382:-meta_predicate 383 resolve_deadlock( ). 384 385resolve_deadlock(Goal) :- 386 thread_self(ThreadId), 387 flag(transaction_count, InitialCount, InitialCount), 388 389 maximum_deadlock_retries(MaximumDeadlockRetries), 390 between(1, MaximumDeadlockRetries, RetryCount), % BTP for deadlock retry 391 392 ( RetryCount >= MaximumDeadlockRetries -> 393 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_FAILED\tCOULD NOT RESOLVE deadlock on thread \'~w\'. Goal: ~w', [ThreadId, Goal]), 394 throw(deadlock_retry_count_exceeded(MaximumDeadlockRetries)) 395 396 ; RetryCount > 1 -> 397 % Check if another transaction has completed. Note complete means committed -or- rolled back 398 flag(transaction_count, CurrentCount, CurrentCount), 399 ( CurrentCount =:= InitialCount -> 400 Flag = no_other_transaction_completed 401 ; otherwise -> 402 Flag = another_transaction_completed 403 ) 404 405 ; otherwise -> 406 Flag = no_deadlock 407 ), 408 409 ( Flag == no_other_transaction_completed -> 410 Delay is ( 2 << RetryCount) / 1000.0, % Exponential backoff up to 2.048s 411 sleep(Delay), 412 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\'(attempt ~w). Initiated by EXPIRY of RANDOM WAIT of ~w seconds.', [ThreadId, RetryCount, Delay]) 413 414 ; Flag == another_transaction_completed -> 415 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\' (attempt ~w). Initiated by COMPLETION of a TRANSACTION on another thread.', [ThreadId, RetryCount]) 416 ; otherwise -> 417 true 418 ), 419 420 catch_all((Goal -> 421 LogicalStatus = 1 422 ; otherwise -> 423 true 424 ), 425 error(odbc('40001', _, _), _), 426 ( cql_log([debug(deadlocks)], warning, 'DEADLOCK_DETECTED\tThread \'~w\' selected as DEADLOCK VICTIM. Goal: ~w', [ThreadId, Goal]), 427 retractall(database_transaction_query_info(ThreadId, _, _)), 428 retractall(transaction_context(_, _, _, _)), 429 retractall(database_event(_, _, _, _, _, _)), 430 fail)), 431 ( RetryCount > 1 -> 432 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLVED\tdeadlocked transaction on thread \'~w\' RESOLVED (attempt ~w).', [ThreadId, RetryCount]) 433 434 ; otherwise -> 435 true 436 ), 437 !, % Don't want to backtrack into the deadlock retry between/3 when Goal fails 438 LogicalStatus == 1.
445maximum_deadlock_retries(10). 446 447% log_transaction_state(+AccessToken, +TransactionId, +TransactionState) 448 449log_transaction_state(AccessToken, TransactionId, TransactionState) :- 450 cql_access_token_to_user_id(AccessToken, UserId), 451 upcase_atom(TransactionState, TransactionStateUc), 452 cql_log([], informational, '\t~p\t~p\t~p', [UserId, TransactionId, TransactionStateUc]).
461register_database_connection_details(Schema, ConnectionDetails) :- 462 assert(database_connection_details(Schema, ConnectionDetails)). 463 464 465update_history(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal):- 466 ignore(cql_update_history_hook(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal)).
471:-multifile(cql:application_value_to_odbc_value_hook/7). 472application_value_to_odbc_value(ApplicationValue, OdbcDataType, Schema, TableName, ColumnName, Qualifiers, OdbcValue):- 473 ( var(ApplicationValue)-> 474 throw(instantiation_error(ApplicationValue)) 475 ; cql:application_value_to_odbc_value_hook(OdbcDataType, Schema, TableName, ColumnName, Qualifiers, ApplicationValue, OdbcValue)-> 476 true 477 ; otherwise-> 478 OdbcValue = ApplicationValue 479 ). 480 481 482odbc_numeric_precision_limit(27).
486:-multifile(cql:odbc_value_to_application_value_hook/7). 487odbc_value_to_application_value(Schema, TableSpec, ColumnName, OdbcValue, ApplicationValue):- 488 cql_data_type(Schema, TableSpec, ColumnName, DatabaseDataType, _, _, _, Domain, _, _), 489 !, 490 ( cql:odbc_value_to_application_value_hook(DatabaseDataType, Schema, TableSpec, ColumnName, Domain, OdbcValue, ApplicationValue)-> 491 true 492 ; otherwise-> 493 ApplicationValue = OdbcValue 494 ). 495 496% FIXME: What to do about this? 497catch_all(A, B, C):- catch(A, B, C). 498 499 500:-multifile(cql:process_database_events/1). 501cql_process_database_events(Events):- 502 ignore(cql:process_database_events(Events)). 503 504:-multifile(cql:cql_transaction_info_hook/5). 505store_transaction_info(AccessToken, Connection, DBMS, Goal):- 506 ( cql:cql_transaction_info_hook(AccessToken, Connection, DBMS, Goal, Info)-> 507 true 508 ; otherwise-> 509 Info = {null} 510 ), 511 thread_self(ThreadId), 512 assert(database_transaction_query_info(ThreadId, Goal, Info)).
516current_transaction_id(TransactionId):-
517 transaction_context(TransactionId, _, _, _)