35
43
44:-module(cql_database,
45 [get_transaction_context/5,
46 odbc_execute_with_statement_cache/7,
47 save_database_event/6,
48 application_value_to_odbc_value/7,
49 odbc_value_to_application_value/5,
50 cql_transaction/3,
51 database_transaction_query_info/3,
52 current_transaction_id/1,
53 transaction_active/0,
54 register_database_connection_details/2,
55 resolve_deadlock/1,
56 database_connection_details/2,
57 odbc_connection_call/3,
58 update_history/14,
59 odbc_cleanup_and_disconnect/1]). 60
61:-use_module(library(cql/cql)). 62
63:-dynamic
64 database_connection_details/2. 65:-volatile
66 database_connection_details/2. 67
68:-thread_local
69 database_event/6,
70 transaction_active/0,
71 transaction_context/4,
72 database_transaction_query_info/3. 73
74get_transaction_context(TransactionId, TrxId, AccessToken, TransactionTimestamp, Connection) :-
75 ( transaction_context(TransactionId_, AccessToken_, TransactionTimestamp_, Connection_) ->
76 TransactionId = TransactionId_,
77 TrxId = {null},
78 AccessToken = AccessToken_,
79 TransactionTimestamp = TransactionTimestamp_,
80 Connection = Connection_
81
82 ; otherwise ->
83 throw(no_database_transaction_active)
84 ).
85
86
87:-meta_predicate
88 odbc_connection_call(+, -, 0). 89
90:-thread_local
91 92 odbc_connection_available/2,
93 94 odbc_connection_in_use/1. 95
96:-multifile(cql_max_db_connections_hook/1). 97:-multifile(cql:odbc_connection_complete_hook/3). 98odbc_connection_call(Schema, Connection, Goal) :-
99 ( retract(odbc_connection_available(Schema, Connection)) -> 100 assert(odbc_connection_in_use(Schema)),
101 setup_call_cleanup(true,
102 Goal,
103 ( odbc_end_transaction(Connection, rollback), 104 retract(odbc_connection_in_use(Schema)),
105 assert(odbc_connection_available(Schema, Connection)))) 106 ; aggregate_all(r(count), odbc_connection_in_use(Schema), r(N)),
107 ( cql_max_db_connections_hook(MaxDbConnections)->
108 true
109 ; otherwise->
110 MaxDbConnections = 10
111 ),
112 N >= MaxDbConnections ->
113 thread_self(ThreadId),
114
115 cql_error(too_many_schema_connections, 'Too many connections on ~w: Maximum is ~w', [ThreadId, MaxDbConnections])
116
117 ; database_connection_details(Schema, ConnectionDetails) ->
118 ( ConnectionDetails = driver_string(DriverString) ->
119 true
120
121 ; ConnectionDetails = dsn(Dsn, Username, Password) ->
122 gethostname(HostName),
123 format(atom(DriverString), 'DSN=~w;UID=~w;PWD=~w;WSID=~w;', [Dsn, Username, Password, HostName])
124
125 ; ConnectionDetails = dsn(Dsn) ->
126 gethostname(HostName),
127 format(atom(DriverString), 'DSN=~w;WSID=~w;', [Dsn, HostName])
128
129 ; otherwise ->
130 throw(invalid_connection_details(ConnectionDetails))
131 ),
132
133 odbc_connect(-,
134 Connection,
135 [driver_string(DriverString),
136 silent(true),
137 null({null}),
138 auto_commit(false),
139 wide_column_threshold(8000),
140 mars(true)]), 141
142 thread_at_exit(odbc_cleanup_and_disconnect(Connection)),
143 assert(odbc_connection_available(Schema, Connection)),
144
145 ignore(cql:odbc_connection_complete_hook(Schema, ConnectionDetails, Connection)),
146 odbc_connection_call(Schema, Connection, Goal)
147
148 ; otherwise ->
149 throw(no_database_connection_details)
150 ).
151
152
153
154
155
166
167odbc_cleanup_and_disconnect(Connection) :-
168 catch_all(odbc_cleanup_and_disconnect_1(Connection),
169 E,
170 ( thread_self(ThreadId),
171 cql_log([], error, '[~w] odbc_cleanup_and_disconnect/1 : ~w', [ThreadId, E]))).
172
173odbc_cleanup_and_disconnect_1(Connection) :-
174 thread_self(ThreadId),
175 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]),
176 odbc_end_transaction(Connection, rollback),
177 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_end_transaction(Connection, rollback)]),
178 forall(retract(cached_prepared_odbc_statement(_, _, Connection, _, _, CachedStatement, _)),
179 ( debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)]),
180 odbc_free_statement(CachedStatement),
181 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_free_statement(CachedStatement)])
182 )
183 ),
184 retractall(lru_key(_)),
185 retractall(lru_statement(_)),
186 debug(odbc_cleanup, 'BEFORE [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]),
187 odbc_disconnect(Connection),
188 debug(odbc_cleanup, 'AFTER [~w] : ~w', [ThreadId, odbc_disconnect(Connection)]),
189 190 retractall(sql_server_spid(Connection, _, _, _)).
191
193
194:-thread_local
195 196 197 198 199 200 201 202 cached_prepared_odbc_statement/7. 203
204:-thread_local
205 lru_statement/1,
206 statement_locked/1,
207 lru_key/1. 208
209max_lru_size(4000).
210
212evict_cache_entries(_, 0):- !.
213evict_cache_entries(Key, N):-
214 N > 0,
215 retract(lru_statement(MutexId)),
216 217 ( statement_locked(MutexId)->
218 219 true
220 ; otherwise->
221 thread_self(ThreadId),
222 retract(cached_prepared_odbc_statement(Sql, _, _, _, _, Statement, MutexId)),
223 odbc_free_statement(Statement),
224 debug(odbc_statement_cache, 'CACHE-EVICT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]),
225 flag(Key, X, X-1)
226 ),
227 NN is N-1,
228 !,
229 evict_cache_entries(Key, NN).
230
231
232odbc_execute_with_statement_cache(Connection, _, _, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :-
233 cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, _, _, Statement, MutexId),
234 !,
235 setup_call_cleanup(assert(statement_locked(MutexId)),
236 ( thread_self(ThreadId),
237 retract(lru_statement(MutexId)),
238 assertz(lru_statement(MutexId)),
239 debug(odbc_statement_cache, 'CACHE-HIT [~w] ~w : ~@', [ThreadId, Statement, trimmed_sql(Sql, 80)]),
240 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row)
241 ),
242 retract(statement_locked(MutexId))).
243
244odbc_execute_with_statement_cache(Connection, FileName, LineNumber, Sql, OdbcParameters, OdbcParameterDataTypes, Row) :-
245 thread_self(ThreadId),
246 debug(odbc_statement_cache, 'CACHE-MISS [~w] : ~@', [ThreadId, trimmed_sql(Sql, 80)]),
247 odbc_prepare(Connection, Sql, OdbcParameterDataTypes, Statement, []),
248 gensym(statement_lock_, MutexId),
249 ( lru_key(Key)->
250 true
251 ; otherwise->
252 gensym(lru_key_, Key),
253 assert(lru_key(Key))
254 ),
255 setup_call_cleanup(assert(statement_locked(MutexId)),
256 ( assertz(cached_prepared_odbc_statement(Sql, OdbcParameterDataTypes, Connection, FileName, LineNumber, Statement, MutexId)),
257 assertz(lru_statement(MutexId)),
258 max_lru_size(MaxSize),
259 flag(Key, CacheSize, CacheSize+1),
260 ( CacheSize >= MaxSize->
261 Delta is CacheSize - MaxSize,
262 evict_cache_entries(Key, Delta)
263 ; otherwise->
264 true
265 ),
266 flag(Key, Z, Z),
267 debug(odbc_statement_cache, 'CACHE-STORE [~w] ~w, ~w : ~@', [ThreadId, Statement, MutexId, trimmed_sql(Sql, 60)]),
268 odbc_execute_with_statistics(Statement, OdbcParameters, OdbcParameterDataTypes, Row)
269 ),
270 retract(statement_locked(MutexId))).
271
272
273
283save_database_event(AccessToken, 284 EventType, 285 Schema, 286 TableName, 287 PrimaryKeyColumnName, 288 PrimaryKey) :- 289 ( database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey) ->
290 291 true
292 ; otherwise->
293 assert(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey))
294 ).
295
296
297:-meta_predicate(cql_transaction(+, +, 0)). 298
299cql_transaction(Schema, AccessToken, Goal):-
300 thread_self(ThreadId),
301 setup_call_cleanup(assert(transaction_active),
302 cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet),
303 ( retractall(database_transaction_query_info(ThreadId, _, _)),
304 retractall(transaction_context(_, _, _, _)),
305 retractall(database_event(_, _, _, _, _, _)),
306 flag(transaction_count, Count, Count+1),
307 retractall(transaction_active))), 308 cql_process_database_events(DatabaseEventsSet).
309
310cql_transaction_1(Schema, AccessToken, Goal, DatabaseEventsSet):-
311 ( transaction_context(ExistingTransactionId, _, _, _) ->
312 throw(database_transaction_already_in_progress(ExistingTransactionId))
313 ; otherwise ->
314 true
315 ),
316 resolve_deadlock(cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet)).
317
318cql_transaction_2(Schema, AccessToken, Goal, DatabaseEventsSet) :-
319 odbc_connection_call(Schema,
320 Connection,
321 ( ( dbms(Schema, 'Microsoft SQL Server')->
322 odbc_query(Connection, 'SELECT CONVERT(VARCHAR(36), NEWID())', row(TransactionId))
323 ; dbms(Schema, 'PostgreSQL') ->
324 odbc_query(Connection, 'SELECT uuid_generate_v1()', row(TransactionId))
325 ; dbms(Schema, 'SQLite') ->
326 odbc_query(Connection, 'SELECT substr(u,1,8)||\'-\'||substr(u,9,4)||\'-4\'||substr(u,13,3)||\'-\'||v||substr(u,17,3)||\'-\'||substr(u,21,12) from (select lower(hex(randomblob(16))) as u, substr(\'89ab\',abs(random()) % 4 + 1, 1) as v)', row(TransactionId))
327 ; otherwise ->
328 throw(no_dbms_for_schema(Schema))
329 ),
330 dbms(Schema, DBMS),
331 store_transaction_info(AccessToken, Connection, DBMS, Goal),
332 get_time(ExecutionTime),
333 assert(transaction_context(TransactionId, AccessToken, ExecutionTime, Connection)),
334
335 ( cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) ->
336 true
337 ; otherwise ->
338 339 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_logic_failure),
340 fail
341 ))).
342
343
344:-meta_predicate
345 cql_transaction_3(0, +, +, +, -). 346
347cql_transaction_3(Goal, Connection, TransactionId, AccessToken, DatabaseEventsSet) :-
348 log_transaction_state(AccessToken, TransactionId, transaction_starting),
349 catch(Goal, E, Error = E),
350 351 findall(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey),
352 retract(database_event(AccessToken, EventType, Schema, TableName, PrimaryKeyColumnName, PrimaryKey)),
353 DatabaseEvents),
354 355 list_to_set(DatabaseEvents, DatabaseEventsSet),
356 ( var(Error) ->
357 odbc_end_transaction(Connection, commit),
358 log_transaction_state(AccessToken, TransactionId, transaction_committed)
359
360 ; otherwise ->
361 362 log_transaction_state(AccessToken, TransactionId, transaction_rolled_back_on_error),
363 throw(Error)
364 ).
365
366
379
380:-meta_predicate
381 resolve_deadlock(0). 382
383resolve_deadlock(Goal) :-
384 thread_self(ThreadId),
385 flag(transaction_count, InitialCount, InitialCount),
386
387 maximum_deadlock_retries(MaximumDeadlockRetries),
388 between(1, MaximumDeadlockRetries, RetryCount), 389
390 ( RetryCount >= MaximumDeadlockRetries ->
391 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_FAILED\tCOULD NOT RESOLVE deadlock on thread \'~w\'. Goal: ~w', [ThreadId, Goal]),
392 throw(deadlock_retry_count_exceeded(MaximumDeadlockRetries))
393
394 ; RetryCount > 1 ->
395 396 flag(transaction_count, CurrentCount, CurrentCount),
397 ( CurrentCount =:= InitialCount ->
398 Flag = no_other_transaction_completed
399 ; otherwise ->
400 Flag = another_transaction_completed
401 )
402
403 ; otherwise ->
404 Flag = no_deadlock
405 ),
406
407 ( Flag == no_other_transaction_completed ->
408 Delay is ( 2 << RetryCount) / 1000.0, 409 sleep(Delay),
410 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\'(attempt ~w). Initiated by EXPIRY of RANDOM WAIT of ~w seconds.', [ThreadId, RetryCount, Delay])
411
412 ; Flag == another_transaction_completed ->
413 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLUTION_ATTEMPT\tRETRYING deadlocked transaction on thread \'~w\' (attempt ~w). Initiated by COMPLETION of a TRANSACTION on another thread.', [ThreadId, RetryCount])
414 ; otherwise ->
415 true
416 ),
417
418 catch_all((Goal ->
419 LogicalStatus = 1
420 ; otherwise ->
421 true
422 ),
423 error(odbc('40001', _, _), _),
424 ( cql_log([debug(deadlocks)], warning, 'DEADLOCK_DETECTED\tThread \'~w\' selected as DEADLOCK VICTIM. Goal: ~w', [ThreadId, Goal]),
425 retractall(database_transaction_query_info(ThreadId, _, _)),
426 retractall(transaction_context(_, _, _, _)),
427 retractall(database_event(_, _, _, _, _, _)),
428 fail)),
429 ( RetryCount > 1 ->
430 cql_log([debug(deadlocks)], warning, 'DEADLOCK_RESOLVED\tdeadlocked transaction on thread \'~w\' RESOLVED (attempt ~w).', [ThreadId, RetryCount])
431
432 ; otherwise ->
433 true
434 ),
435 !, 436 LogicalStatus == 1.
437
438
442
443maximum_deadlock_retries(10).
444
446
447log_transaction_state(AccessToken, TransactionId, TransactionState) :-
448 cql_access_token_to_user_id(AccessToken, UserId),
449 upcase_atom(TransactionState, TransactionStateUc),
450 cql_log([], informational, '\t~p\t~p\t~p', [UserId, TransactionId, TransactionStateUc]).
451
452
458
459register_database_connection_details(Schema, ConnectionDetails) :-
460 assert(database_connection_details(Schema, ConnectionDetails)).
461
462
463update_history(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal):-
464 ignore(cql_update_history_hook(Schema, TableName, AttributeName, PrimaryKeyAttributeName, PrimaryKeyValue, ApplicationValueBefore, ApplicationValueAfter, AccessToken, Info, TransactionId, TransactionTimestamp, ThreadId, Connection, Goal)).
465
466
467
469:-multifile(cql:application_value_to_odbc_value_hook/7). 470application_value_to_odbc_value(ApplicationValue, OdbcDataType, Schema, TableName, ColumnName, Qualifiers, OdbcValue):-
471 ( var(ApplicationValue)->
472 throw(instantiation_error(ApplicationValue))
473 ; cql:application_value_to_odbc_value_hook(OdbcDataType, Schema, TableName, ColumnName, Qualifiers, ApplicationValue, OdbcValue)->
474 true
475 ; otherwise->
476 OdbcValue = ApplicationValue
477 ).
478
479
480odbc_numeric_precision_limit(27).
481
482
484:-multifile(cql:odbc_value_to_application_value_hook/7). 485odbc_value_to_application_value(Schema, TableSpec, ColumnName, OdbcValue, ApplicationValue):-
486 cql_data_type(Schema, TableSpec, ColumnName, DatabaseDataType, _, _, _, Domain, _, _),
487 !,
488 ( cql:odbc_value_to_application_value_hook(DatabaseDataType, Schema, TableSpec, ColumnName, Domain, OdbcValue, ApplicationValue)->
489 true
490 ; otherwise->
491 ApplicationValue = OdbcValue
492 ).
493
495catch_all(A, B, C):- catch(A, B, C).
496
497
498:-multifile(cql:process_database_events/1). 499cql_process_database_events(Events):-
500 ignore(cql:process_database_events(Events)).
501
502:-multifile(cql:cql_transaction_info_hook/5). 503store_transaction_info(AccessToken, Connection, DBMS, Goal):-
504 ( cql:cql_transaction_info_hook(AccessToken, Connection, DBMS, Goal, Info)->
505 true
506 ; otherwise->
507 Info = {null}
508 ),
509 thread_self(ThreadId),
510 assert(database_transaction_query_info(ThreadId, Goal, Info)).
511
513
514current_transaction_id(TransactionId):-
515 transaction_context(TransactionId, _, _, _)