1/* Part of SWI-Prolog 2 3 Author: Mike Elston 4 Matt Lilley 5 E-mail: matt.s.lilley@gmail.com 6 WWW: http://www.swi-prolog.org 7 Copyright (c) 2014, Mike Elston, Matt Lilley 8 All rights reserved. 9 10 Redistribution and use in source and binary forms, with or without 11 modification, are permitted provided that the following conditions 12 are met: 13 14 1. Redistributions of source code must retain the above copyright 15 notice, this list of conditions and the following disclaimer. 16 17 2. Redistributions in binary form must reproduce the above copyright 18 notice, this list of conditions and the following disclaimer in 19 the documentation and/or other materials provided with the 20 distribution. 21 22 THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 23 "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 24 LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS 25 FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE 26 COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, 27 INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, 28 BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; 29 LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 30 CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 31 LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN 32 ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 33 POSSIBILITY OF SUCH DAMAGE. 34*/ 35 36/* PostgreSQL is a trademark of the PostgreSQL Global Development Group. 37 Microsoft, SQL Server, and Windows are either registered trademarks or 38 trademarks of Microsoft Corporation in the United States and/or other 39 countries. SQLite is a registered trademark of Hipp, Wyrick & Company, 40 Inc in the United States. All other trademarks or registered trademarks 41 are the property of their respective owners. 42*/ 43 44:-module(cql_database, 45 [get_transaction_context/5, 46 odbc_execute_with_statement_cache/7, 47 save_database_event/6, 48 application_value_to_odbc_value/7, 49 odbc_value_to_application_value/5, 50 cql_transaction/3, 51 database_transaction_query_info/3, 52 current_transaction_id/1, 53 transaction_active/0, 54 register_database_connection_details/2, 55 resolve_deadlock/1, 56 database_connection_details/2, 57 odbc_connection_call/3, 58 update_history/14, 59 odbc_cleanup_and_disconnect/1]). 60 61:-use_module(library(cql/cql)). 62 63:-dynamic 64 database_connection_details/2. 65:-volatile 66 database_connection_details/2. 67 68:-thread_local 69 database_event/6, 70 transaction_active/0, 71 transaction_context/4, 72 database_transaction_query_info/3. 73 74get_transaction_context(TransactionId, TrxId, AccessToken, TransactionTimestamp, Connection) :- 75 ( transaction_context(TransactionId_, AccessToken_, TransactionTimestamp_, Connection_) -> 76 TransactionId = TransactionId_, 77 TrxId = {null}, 78 AccessToken = AccessToken_, 79 TransactionTimestamp = TransactionTimestamp_, 80 Connection = Connection_ 81 82 ; otherwise -> 83 throw(no_database_transaction_active) 84 ). 85 86 87:-meta_predicate 88 odbc_connection_call( , , ). 89 90:-thread_local 91 % odbc_connection_available(Schema, Connection) 92 odbc_connection_available/2, 93 % odbc_connection_in_use(Schema) 94 odbc_connection_in_use/1. 95 96:-multifile(cql_max_db_connections_hook/1). 97:-multifile(cql:odbc_connection_complete_hook/3). 98odbc_connection_call(Schema, Connection, Goal) :- 99 ( retract(odbc_connection_available(Schema, Connection)) -> % Get a connection from the pool 100 assert(odbc_connection_in_use(Schema)), 101 setup_call_cleanup(true, 102 Goal, 103 ( odbc_end_transaction(Connection, rollback), % Ensure connections in the pool have no pending results dangling 104 retract(odbc_connection_in_use(Schema)), 105 assert(odbc_connection_available(Schema, Connection)))) % Put connection back in the pool 106 ; aggregate_all(r(count), odbc_connection_in_use(Schema), r(N)), 107 ( cql_max_db_connections_hook(MaxDbConnections)-> 108 true 109 ; otherwise-> 110 MaxDbConnections = 10 111 ), 112 N >= MaxDbConnections -> 113 thread_self(ThreadId), 114 115 cql_error(too_many_schema_connections, 'Too many connections on ~w: Maximum is ~w', [ThreadId, MaxDbConnections]) 116 117 ; database_connection_details(Schema, ConnectionDetails) -> 118 ( ConnectionDetails = driver_string(DriverString) -> 119 true 120 121 ; ConnectionDetails = dsn(Dsn, Username, Password) -> 122 gethostname(HostName), 123 format(atom(DriverString), 'DSN=~w;UID=~w;PWD=~w;WSID=~w;', [Dsn, Username, Password, HostName]) 124 125 ; ConnectionDetails = dsn(Dsn) -> 126 gethostname(HostName), 127 format(atom(DriverString), 'DSN=~w;WSID=~w;', [Dsn, HostName]) 128 129 ; otherwise -> 130 throw(invalid_connection_details(ConnectionDetails)) 131 ), 132 133 odbc_connect(-, 134 Connection, 135 [driver_string(DriverString), 136 silent(true), 137 null({null}), 138 auto_commit(false), 139 wide_column_threshold(8000), 140 mars(true)]), % In theory this is not needed following bug-5181 but see comment in predicate description 141 142 thread_at_exit(odbc_cleanup_and_disconnect(Connection)), 143 assert(odbc_connection_available(Schema, Connection)), 144 145 ignore(cql:odbc_connection_complete_hook(Schema, ConnectionDetails, Connection)), 146 odbc_connection_call(Schema, Connection, Goal) 147 148 ; otherwise -> 149 throw(no_database_connection_details) 150 ).
To avoid leaks, all exiting threads with database connections should call this. See odbc_connection_call/2 (thread_at_exit/1)
Note that any exception inside odbc_cleanup_and_disconnect/1 will result in it not going on to the next step.
We log exceptions to the event log because exceptions at this level are associated with the server process crashing and the SE log is unlikely to capture anything useful.
167odbc_cleanup_and_disconnect(Connection) :- 168 catch_all(odbc_cleanup_and_disconnect_1(Connection), 169 E, 170 ( thread_self(ThreadId), 171 cql_log([], error, '[~w] odbc_cleanup_and_disconnect/1 : ~w', [ThreadId, E]))). 172 173odbc_cleanup_and_disconnect_1(Connection) :- 174 thread_self(ThreadId), 175 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), 176 odbc_end_transaction(Connection, rollback), 177 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]), 178 forall(retract(cached_prepared_odbc_statement(_, _, Connection, _, _, CachedStatement, _)), 179 ( debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]), 180 odbc_free_statement(CachedStatement), 181 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]) 182 ) 183 ), 184 retractall(lru_key(_)), 185 retractall(lru_statement(_)), 186 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), 187 odbc_disconnect(Connection), 188 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]), 189 % Get rid of these last so there is some evidence if odbc_disconnect/1 does not work 190 retractall(sql_server_spid(Connection, _, _, _)).
194:-thread_local 195 % cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId) 196 % Note that we need OdbcParameterDataTypes in here so we can look up the correct statement. Consider: 197 % "SELECT * FROM some_table WHERE some_column = ?" 198 % This can be compiled with varchar(4) as the datatype, to get statement S, 199 % If we then want to do a query where user_id = 'ERIC' we are going to get a runtime type error. 200 % Ordinarily this isn't a problem because the domain is well-established at compile-time, but this 201 % is not the case when dealing with dynamic tables, specifically #cql_in. 202 cached_prepared_odbc_statement/7. 203 204:-thread_local 205 lru_statement/1, 206 statement_locked/1, 207 lru_key/1. 208 209max_lru_size(4000). 210 211% This is called with the current statement locked 212evict_cache_entries(_, 0):- !. 213evict_cache_entries(Key, N):- 214 N > 0, 215 retract(lru_statement(MutexId)), 216 % This statement cannot be locked unless the cache size is extremely small, since we JUST cycled the current statement to the bottom of the stack 217 ( statement_locked(MutexId)-> 218 % Just do nothing in this case. We will get it next time. Besides, it is very unlikely to happen 219 true 220 ; otherwise-> 221 thread_self(ThreadId), 222 retract(cached_prepared_odbc_statement(Sql, _, _, _, _, Statement, MutexId)), 223 odbc_free_statement(Statement), 224 debug(odbc_statement_cache, 'CACHE-EVICT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), 225 flag(Key, X, X-1) 226 ), 227 NN is N-1, 228 !, 229 evict_cache_entries(Key, NN). 230 231 232odbc_execute_with_statement_cache(Connection, _, _, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- 233 cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, _, _, Statement, MutexId), 234 !, 235 setup_call_cleanup(assert(statement_locked(MutexId)), 236 ( thread_self(ThreadId), 237 retract(lru_statement(MutexId)), 238 assertz(lru_statement(MutexId)), 239 debug(odbc_statement_cache, 'CACHE-HIT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]), 240 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) 241 ), 242 retract(statement_locked(MutexId))). 243 244odbc_execute_with_statement_cache(Connection, FileName, LineNumber, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :- 245 thread_self(ThreadId), 246 debug(odbc_statement_cache, 'CACHE-MISS [~w] : ~@', [ThreadId, trimmed_sql(Sql, 80)]), 247 odbc_prepare(Connection, Sql, OdbcParameterDataTypes, Statement, []), 248 gensym(statement_lock_, MutexId), 249 ( lru_key(Key)-> 250 true 251 ; otherwise-> 252 gensym(lru_key_, Key), 253 assert(lru_key(Key)) 254 ), 255 setup_call_cleanup(assert(statement_locked(MutexId)), 256 ( assertz(cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId)), 257 assertz(lru_statement(MutexId)), 258 max_lru_size(MaxSize), 259 flag(Key, CacheSize, CacheSize+1), 260 ( CacheSize >= MaxSize-> 261 Delta is CacheSize - MaxSize, 262 evict_cache_entries(Key, Delta) 263 ; otherwise-> 264 true 265 ), 266 flag(Key, Z, Z), 267 debug(odbc_statement_cache, 'CACHE-STORE [~w] ~w, ~w : ~@', [ThreadId, Statement, MutexId, trimmed_sql(Sql, 60)]), 268 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row) 269 ), 270 retract(statement_locked(MutexId))).
283save_database_event(AccessToken, % + 284 EventType, % + 285 Schema, % + 286 TableName, % + 287 PrimaryKeyColumnName, % + 288 PrimaryKey) :- % + 289 ( database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey) -> 290 % No point storing an event more than once 291 true 292 ; otherwise-> 293 assert(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)) 294 ). 295 296 297:-meta_predicate(cql_transaction( , , )). 298 299cql_transaction(Schema, AccessToken, Goal):- 300 thread_self(ThreadId), 301 setup_call_cleanup(assert(transaction_active), 302 cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet), 303 ( retractall(database_transaction_query_info(ThreadId, _, _)), 304 retractall(transaction_context(_, _, _, _)), 305 retractall(database_event(_, _, _, _, _, _)), 306 flag(transaction_count, Count, Count+1), 307 retractall(transaction_active))), % Removed last so if transaction_active succeeds while executing Goal then the other facts are still available to Goal 308 cql_process_database_events(DatabaseEventsSet). 309 310cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet):- 311 ( transaction_context(ExistingTransactionId, _, _, _) -> 312 throw(database_transaction_already_in_progress(ExistingTransactionId)) 313 ; otherwise -> 314 true 315 ), 316 resolve_deadlock(cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet)). 317 318cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet) :- 319 odbc_connection_call(Schema, 320 Connection, 321 ( ( dbms(Schema, 'Microsoft SQL Server')-> 322 odbc_query(Connection, 'SELECT CONVERT(VARCHAR(36), NEWID())', row(TransactionId)) 323 ; dbms(Schema, 'PostgreSQL') -> 324 odbc_query(Connection, 'SELECT uuid_generate_v1()', row(TransactionId)) 325 ; dbms(Schema, 'SQLite') -> 326 odbc_query(Connection, 'SELECT substr(u,1,8)||\'-\'||substr(u,9,4)||\'-4\'||substr(u,13,3)||\'-\'||v||substr(u,17,3)||\'-\'||substr(u,21,12) from (select lower(hex(randomblob(16))) as u, substr(\'89ab\',abs(random()) % 4 + 1, 1) as v)', row(TransactionId)) 327 ; otherwise -> 328 throw(no_dbms_for_schema(Schema)) 329 ), 330 dbms(Schema, DBMS), 331 store_transaction_info(AccessToken, Connection, DBMS, Goal), 332 get_time(ExecutionTime), 333 assert(transaction_context(TransactionId, AccessToken, ExecutionTime, Connection)), 334 335 ( cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) -> 336 true 337 ; otherwise -> 338 % odbc_connection_call/3 always rolls back so no need for explicit rollback here 339 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_logic_failure), 340 fail 341 ))). 342 343 344:-meta_predicate 345 cql_transaction_3( , , , , ). 346 347cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) :- 348 log_transaction_state(AccessToken, TransactionId, transaction_starting), 349 catch(Goal, E, Error = E), 350 % Note that this previously did a setof/3. This reorders events, which breaks event consolidation 351 findall(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey), 352 retract(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)), 353 DatabaseEvents), 354 % list_to_set/2 is NlogN and preserves order 355 list_to_set(DatabaseEvents, DatabaseEventsSet), 356 ( var(Error) -> 357 odbc_end_transaction(Connection, commit), 358 log_transaction_state(AccessToken, TransactionId, transaction_committed) 359 360 ; otherwise -> 361 % odbc_connection_call/3 always rolls back so no need for explicit rollback here 362 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_error), 363 throw(Error) 364 ).
Use this only when you are sure Goal has no non-database side effects (assert/retract, file operations etc)
Originally developed for use inside cql_transaction/3, resolve_deadlock/1 can also be used to ensure non-transactional operations can resolve deadlocks.
380:-meta_predicate 381 resolve_deadlock( ). 382 383resolve_deadlock(Goal) :- 384 thread_self(ThreadId), 385 flag(transaction_count, InitialCount, InitialCount), 386 387 maximum_deadlock_retries(MaximumDeadlockRetries), 388 between(1, MaximumDeadlockRetries, RetryCount), % BTP for deadlock retry 389 390 ( RetryCount >= MaximumDeadlockRetries -> 391 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_FAILED\tCOULD NOT RESOLVE deadlock on thread \'~w\'. Goal: ~w', [ThreadId, Goal]), 392 throw(deadlock_retry_count_exceeded(MaximumDeadlockRetries)) 393 394 ; RetryCount > 1 -> 395 % Check if another transaction has completed. Note complete means committed -or- rolled back 396 flag(transaction_count, CurrentCount, CurrentCount), 397 ( CurrentCount =:= InitialCount -> 398 Flag = no_other_transaction_completed 399 ; otherwise -> 400 Flag = another_transaction_completed 401 ) 402 403 ; otherwise -> 404 Flag = no_deadlock 405 ), 406 407 ( Flag == no_other_transaction_completed -> 408 Delay is ( 2 << RetryCount) / 1000.0, % Exponential backoff up to 2.048s 409 sleep(Delay), 410 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\'(attempt ~w). Initiated by EXPIRY of RANDOM WAIT of ~w seconds.', [ThreadId, RetryCount, Delay]) 411 412 ; Flag == another_transaction_completed -> 413 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\' (attempt ~w). Initiated by COMPLETION of a TRANSACTION on another thread.', [ThreadId, RetryCount]) 414 ; otherwise -> 415 true 416 ), 417 418 catch_all((Goal -> 419 LogicalStatus = 1 420 ; otherwise -> 421 true 422 ), 423 error(odbc('40001', _, _), _), 424 ( cql_log([debug(deadlocks)], warning, 'DEADLOCK_DETECTED\tThread \'~w\' selected as DEADLOCK VICTIM. Goal: ~w', [ThreadId, Goal]), 425 retractall(database_transaction_query_info(ThreadId, _, _)), 426 retractall(transaction_context(_, _, _, _)), 427 retractall(database_event(_, _, _, _, _, _)), 428 fail)), 429 ( RetryCount > 1 -> 430 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLVED\tdeadlocked transaction on thread \'~w\' RESOLVED (attempt ~w).', [ThreadId, RetryCount]) 431 432 ; otherwise -> 433 true 434 ), 435 !, % Don't want to backtrack into the deadlock retry between/3 when Goal fails 436 LogicalStatus == 1.
443maximum_deadlock_retries(10). 444 445% log_transaction_state(+AccessToken, +TransactionId, +TransactionState) 446 447log_transaction_state(AccessToken, TransactionId, TransactionState) :- 448 cql_access_token_to_user_id(AccessToken, UserId), 449 upcase_atom(TransactionState, TransactionStateUc), 450 cql_log([], informational, '\t~p\t~p\t~p', [UserId, TransactionId, TransactionStateUc]).
459register_database_connection_details(Schema, ConnectionDetails) :- 460 assert(database_connection_details(Schema, ConnectionDetails)). 461 462 463update_history(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal):- 464 ignore(cql_update_history_hook(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal)).
469:-multifile(cql:application_value_to_odbc_value_hook/7). 470application_value_to_odbc_value(ApplicationValue, OdbcDataType, Schema, TableName, ColumnName, Qualifiers, OdbcValue):- 471 ( var(ApplicationValue)-> 472 throw(instantiation_error(ApplicationValue)) 473 ; cql:application_value_to_odbc_value_hook(OdbcDataType, Schema, TableName, ColumnName, Qualifiers, ApplicationValue, OdbcValue)-> 474 true 475 ; otherwise-> 476 OdbcValue = ApplicationValue 477 ). 478 479 480odbc_numeric_precision_limit(27).
484:-multifile(cql:odbc_value_to_application_value_hook/7). 485odbc_value_to_application_value(Schema, TableSpec, ColumnName, OdbcValue, ApplicationValue):- 486 cql_data_type(Schema, TableSpec, ColumnName, DatabaseDataType, _, _, _, Domain, _, _), 487 !, 488 ( cql:odbc_value_to_application_value_hook(DatabaseDataType, Schema, TableSpec, ColumnName, Domain, OdbcValue, ApplicationValue)-> 489 true 490 ; otherwise-> 491 ApplicationValue = OdbcValue 492 ). 493 494% FIXME: What to do about this? 495catch_all(A, B, C):- catch(A, B, C). 496 497 498:-multifile(cql:process_database_events/1). 499cql_process_database_events(Events):- 500 ignore(cql:process_database_events(Events)). 501 502:-multifile(cql:cql_transaction_info_hook/5). 503store_transaction_info(AccessToken, Connection, DBMS, Goal):- 504 ( cql:cql_transaction_info_hook(AccessToken, Connection, DBMS, Goal, Info)-> 505 true 506 ; otherwise-> 507 Info = {null} 508 ), 509 thread_self(ThreadId), 510 assert(database_transaction_query_info(ThreadId, Goal, Info)).
514current_transaction_id(TransactionId):-
515 transaction_context(TransactionId, _, _, _)