1 module fdb.transaction; 2 3 import 4 std.array, 5 std.conv, 6 std.exception; 7 8 import 9 fdb.database, 10 fdb.direct, 11 fdb.disposable, 12 fdb.error, 13 fdb.fdb_c, 14 fdb.fdb_c_options, 15 fdb.future, 16 fdb.range, 17 fdb.rangeinfo; 18 19 shared interface IReadOnlyTransaction 20 { 21 @property bool isSnapshot(); 22 23 shared(KeyFuture) getKey( 24 const Selector selector, 25 KeyFutureCallback callback = null); 26 27 shared(ValueFuture) get( 28 const Key key, 29 ValueFutureCallback callback = null); 30 31 /** 32 * Returns: Key-value pairs within (begin, end) range 33 */ 34 shared(KeyValueFuture) getRange( 35 RangeInfo info, 36 KeyValueFutureCallback callback = null); 37 38 void addReadConflictRange(RangeInfo info); 39 void addWriteConflictRange(RangeInfo info); 40 41 shared(VoidFuture) onError( 42 const FDBException ex, 43 VoidFutureCallback callback = null); 44 45 shared(VersionFuture) getReadVersion(VersionFutureCallback callback = null); 46 47 long getCommittedVersion(); 48 49 shared(StringFuture) getAddressesForKey( 50 const Key key, 51 StringFutureCallback callback = null); 52 53 shared(Value) opIndex(const Key key); 54 55 RecordRange opIndex(RangeInfo info); 56 } 57 58 alias WorkFunc = void delegate(shared Transaction tr, VoidFutureCallback cb); 59 60 shared class Transaction : IDirect, IDisposable, IReadOnlyTransaction 61 { 62 private const Database db; 63 private TransactionHandle th; 64 private const bool _isSnapshot; 65 66 @property bool isSnapshot() 67 { 68 return _isSnapshot; 69 } 70 71 private IDisposable[] futures; 72 73 invariant() 74 { 75 assert(db !is null); 76 } 77 78 this( 79 TransactionHandle th, 80 const shared Database db) 81 in 82 { 83 enforce(db !is null, "db must be set"); 84 enforce(th !is null, "th must be set"); 85 } 86 body 87 { 88 this.th = cast(shared)th; 89 this.db = db; 90 this._isSnapshot = false; 91 } 92 93 invariant() 94 { 95 assert(db !is null); 96 } 97 98 private this( 99 shared TransactionHandle th, 100 const shared Database db, 101 const bool isSnapshot) 102 in 103 { 104 enforce(db !is null, "db must be set"); 105 enforce(th !is null, "th must be set"); 106 } 107 body 108 { 109 this.th = cast(shared)th; 110 this.db = db; 111 this._isSnapshot = isSnapshot; 112 } 113 114 @property shared(IReadOnlyTransaction) snapshot() 115 { 116 auto snapshot = new shared Transaction(th, db, true); 117 return cast(shared IReadOnlyTransaction)snapshot; 118 } 119 120 ~this() 121 { 122 dispose; 123 } 124 125 void dispose() 126 { 127 // parent transaction should handle destruction 128 if (!th || isSnapshot) return; 129 130 fdb_transaction_destroy(cast(TransactionHandle)th); 131 th = null; 132 } 133 134 void set(const Key key, const Value value) const 135 in 136 { 137 enforce(key !is null); 138 enforce(!key.empty); 139 enforce(value !is null); 140 enforce(!value.empty); 141 } 142 body 143 { 144 fdb_transaction_set( 145 cast(TransactionHandle)th, 146 &key[0], 147 cast(int)key.length, 148 &value[0], 149 cast(int)value.length); 150 } 151 152 auto commit(VoidFutureCallback callback = null) 153 { 154 // cancel, commit and reset are mutually exclusive 155 synchronized (this) 156 { 157 auto fh = fdb_transaction_commit(cast(TransactionHandle)th); 158 auto future = startOrCreateFuture!VoidFuture(fh, this, callback); 159 futures ~= future; 160 return future; 161 } 162 } 163 164 void cancel() 165 { 166 // cancel, commit and reset are mutually exclusive 167 synchronized (this) 168 fdb_transaction_cancel(cast(TransactionHandle)th); 169 } 170 171 /** 172 * Resets transaction to its initial state 173 */ 174 void reset() 175 { 176 // cancel, commit and reset are mutually exclusive 177 synchronized (this) 178 fdb_transaction_reset(cast(TransactionHandle)th); 179 } 180 181 void clear(const Key key) const 182 in 183 { 184 enforce(key !is null); 185 enforce(!key.empty); 186 } 187 body 188 { 189 fdb_transaction_clear( 190 cast(TransactionHandle)th, 191 &key[0], 192 cast(int)key.length); 193 } 194 195 void clearRange(const RangeInfo info) const 196 in 197 { 198 enforce(!info.begin.key.empty); 199 enforce(!info.end.key.empty); 200 } 201 body 202 { 203 fdb_transaction_clear_range( 204 cast(TransactionHandle)th, 205 &info.begin.key[0], 206 cast(int)info.begin.key.length, 207 &info.end.key[0], 208 cast(int)info.end.key.length); 209 } 210 211 shared(KeyFuture) getKey( 212 const Selector selector, 213 KeyFutureCallback callback = null) 214 { 215 auto fh = fdb_transaction_get_key( 216 cast(TransactionHandle)th, 217 &selector.key[0], 218 cast(int)selector.key.length, 219 cast(fdb_bool_t)selector.orEqual, 220 selector.offset, 221 cast(fdb_bool_t)_isSnapshot); 222 223 auto future = startOrCreateFuture!KeyFuture(fh, this, callback); 224 synchronized (this) 225 futures ~= future; 226 return future; 227 } 228 229 shared(ValueFuture) get( 230 const Key key, 231 ValueFutureCallback callback = null) 232 in 233 { 234 enforce(key !is null); 235 enforce(!key.empty); 236 } 237 body 238 { 239 auto fh = fdb_transaction_get( 240 cast(TransactionHandle)th, 241 &key[0], 242 cast(int)key.length, 243 cast(fdb_bool_t)_isSnapshot); 244 245 auto future = startOrCreateFuture!ValueFuture(fh, this, callback); 246 synchronized (this) 247 futures ~= future; 248 return future; 249 } 250 251 /** 252 * Returns: Key-value pairs within (begin, end) range 253 */ 254 shared(KeyValueFuture) getRange( 255 RangeInfo info, 256 KeyValueFutureCallback callback = null) 257 { 258 auto begin = sanitizeKey(info.begin.key, [ 0x00 ]); 259 auto end = sanitizeKey(info.end.key, [ 0xff ]); 260 261 auto fh = fdb_transaction_get_range( 262 cast(TransactionHandle)cast(TransactionHandle)th, 263 264 &begin[0], 265 cast(int)begin.length, 266 cast(fdb_bool_t)info.begin.orEqual, 267 info.begin.offset, 268 269 &end[0], 270 cast(int)end.length, 271 cast(fdb_bool_t)info.end.orEqual, 272 info.end.offset, 273 274 info.limit, 275 0, 276 info.mode, 277 info.iteration, 278 cast(fdb_bool_t)_isSnapshot, 279 info.reverse); 280 281 auto future = startOrCreateFuture!KeyValueFuture( 282 fh, this, info, callback); 283 synchronized (this) 284 futures ~= future; 285 return future; 286 } 287 288 auto watch(const Key key, VoidFutureCallback callback = null) 289 in 290 { 291 enforce(key !is null); 292 enforce(!key.empty); 293 } 294 body 295 { 296 auto fh = fdb_transaction_watch( 297 cast(TransactionHandle)th, 298 &key[0], 299 cast(int)key.length); 300 auto future = startOrCreateFuture!WatchFuture(fh, this, callback); 301 synchronized (this) 302 futures ~= future; 303 return future; 304 } 305 306 private void addConflictRange( 307 RangeInfo info, 308 const ConflictRangeType type) const 309 in 310 { 311 enforce(!info.begin.key.empty); 312 enforce(!info.end.key.empty); 313 } 314 body 315 { 316 auto err = fdb_transaction_add_conflict_range( 317 cast(TransactionHandle)th, 318 &info.begin.key[0], 319 cast(int)info.begin.key.length, 320 &info.end.key[0], 321 cast(int)info.end.key.length, 322 type); 323 enforceError(err); 324 } 325 326 void addReadConflictRange(RangeInfo info) const 327 { 328 addConflictRange(info, ConflictRangeType.READ); 329 } 330 331 void addWriteConflictRange(RangeInfo info) const 332 { 333 addConflictRange(info, ConflictRangeType.WRITE); 334 } 335 336 shared(VoidFuture) onError( 337 const FDBException ex, 338 VoidFutureCallback callback = null) 339 { 340 auto fh = fdb_transaction_on_error( 341 cast(TransactionHandle)th, 342 ex.err); 343 auto future = startOrCreateFuture!VoidFuture(fh, this, callback); 344 synchronized (this) 345 futures ~= future; 346 return future; 347 } 348 349 void setReadVersion(const int ver) const 350 in 351 { 352 enforce(ver > 0); 353 } 354 body 355 { 356 fdb_transaction_set_read_version( 357 cast(TransactionHandle)th, 358 ver); 359 } 360 361 shared(VersionFuture) getReadVersion(VersionFutureCallback callback = null) 362 { 363 auto fh = fdb_transaction_get_read_version( 364 cast(TransactionHandle)th); 365 auto future = startOrCreateFuture!VersionFuture(fh, this, callback); 366 synchronized (this) 367 futures ~= future; 368 return future; 369 } 370 371 long getCommittedVersion() const 372 { 373 long ver; 374 auto err = fdb_transaction_get_committed_version( 375 cast(TransactionHandle)th, 376 &ver); 377 enforceError(err); 378 return ver; 379 } 380 381 shared(StringFuture) getAddressesForKey( 382 const Key key, 383 StringFutureCallback callback = null) 384 in 385 { 386 enforce(key !is null); 387 enforce(!key.empty); 388 } 389 body 390 { 391 auto fh = fdb_transaction_get_addresses_for_key( 392 cast(TransactionHandle)th, 393 &key[0], 394 cast(int)key.length); 395 396 auto future = startOrCreateFuture!StringFuture(fh, this, callback); 397 synchronized (this) 398 futures ~= future; 399 return future; 400 } 401 402 /** 403 * Performs an addition of little-endian integers. If the existing value 404 * in the database is not present or shorter than ``param``, it is first 405 * extended to the length of ``param`` with zero bytes. If ``param`` is 406 * shorter than the existing value in the database, the existing value is 407 * truncated to match the length of ``param``. The integers to be added 408 * must be stored in a little-endian representation. They can be signed 409 * in two's complement representation or unsigned. You can add to an integer 410 * at a known offset in the value by prepending the appropriate number of 411 * zero bytes to ``param`` and padding with zero bytes to match the length 412 * of the value. However, this offset technique requires that you know the 413 * addition will not cause the integer field within the value to overflow. 414 */ 415 void add(const Key key, const Value value) const 416 { 417 callAtomicOperation(key, value, MutationType.ADD); 418 } 419 420 // Deprecated 421 // ADD_MUTATION_TYPE("and", 6); 422 423 /** 424 * Performs a bitwise ``and`` operation. If the existing value in the 425 * database is not present or shorter than ``param``, it is first extended 426 * to the length of ``param`` with zero bytes. If ``param`` is shorter than 427 * the existing value in the database, the existing value is truncated to 428 * match the length of ``param``. 429 */ 430 void bitAnd(const Key key, const Value value) const 431 { 432 callAtomicOperation(key, value, MutationType.BIT_AND); 433 } 434 435 // Deprecated 436 //ADD_MUTATION_TYPE("or", 7); 437 438 /** 439 * Performs a bitwise ``or`` operation. If the existing value in the 440 * database is not present or shorter than ``param``, it is first extended 441 * to the length of ``param`` with zero bytes. If ``param`` is shorter than 442 * the existing value in the database, the existing value is truncated to 443 * match the length of ``param``. 444 */ 445 void bitOr(const Key key, const Value value) const 446 { 447 callAtomicOperation(key, value, MutationType.BIT_OR); 448 } 449 450 // Deprecated 451 // ADD_MUTATION_TYPE("xor", 8); 452 453 /** 454 * Performs a bitwise ``xor`` operation. If the existing value in the 455 * database is not present or shorter than ``param``, it is first extended 456 * to the length of ``param`` with zero bytes. If ``param`` is shorter than 457 * the existing value in the database, the existing value is truncated to 458 * match the length of ``param``. 459 */ 460 void bitXor(const Key key, const Value value) const 461 { 462 callAtomicOperation(key, value, MutationType.BIT_XOR); 463 } 464 465 private void callAtomicOperation( 466 const Key key, 467 const Value value, 468 const MutationType type) const 469 in 470 { 471 enforce(key !is null); 472 enforce(!key.empty); 473 enforce(value !is null); 474 enforce(!value.empty); 475 } 476 body 477 { 478 fdb_transaction_atomic_op( 479 cast(TransactionHandle)th, 480 &key[0], 481 cast(int)key.length, 482 &value[0], 483 cast(int)value.length, 484 type); 485 } 486 487 /** 488 * The transaction, if not self-conflicting, may be committed a second time 489 * after commit succeeds, in the event of a fault 490 * Parameter: Option takes no parameter 491 */ 492 void setCausalWriteRisky() const 493 { 494 setTransactionOption(TransactionOption.CAUSAL_WRITE_RISKY); 495 } 496 497 /** 498 * The read version will be committed, and usually will be the latest 499 * committed, but might not be the latest committed in the event of a fault 500 * or partition 501 * Parameter: Option takes no parameter 502 */ 503 void setCausalReadRisky() const 504 { 505 setTransactionOption(TransactionOption.CAUSAL_READ_RISKY); 506 } 507 508 // Parameter: Option takes no parameter 509 void setCausalReadDisable() const 510 { 511 setTransactionOption(TransactionOption.CAUSAL_READ_DISABLE); 512 } 513 514 /** 515 * The next write performed on this transaction will not generate a write 516 * conflict range. As a result, other transactions which read the key(s) 517 * being modified by the next write will not conflict with this transaction. 518 * Care needs to be taken when using this option on a transaction that is 519 * shared between multiple threads. When setting this option, write conflict 520 * ranges will be disabled on the next write operation, regardless of what 521 * thread it is on. 522 * Parameter: Option takes no parameter 523 */ 524 void setNextWriteNoWriteConflictRange() const 525 { 526 setTransactionOption( 527 TransactionOption.NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); 528 } 529 530 // Parameter: Option takes no parameter 531 void setCheckWritesEnable() const 532 { 533 setTransactionOption(TransactionOption.CHECK_WRITES_ENABLE); 534 } 535 536 /** 537 * Reads performed by a transaction will not see any prior mutations that 538 * occured in that transaction, instead seeing the value which was in the 539 * database at the transaction's read version. This option may provide a 540 * small performance benefit for the client, but also disables a number of 541 * client-side optimizations which are beneficial for transactions which 542 * tend to read and write the same keys within a single transaction. 543 * Parameter: Option takes no parameter 544 */ 545 void setReadYourWritesDisable() const 546 { 547 setTransactionOption(TransactionOption.READ_YOUR_WRITES_DISABLE); 548 } 549 550 /** 551 * Disables read-ahead caching for range reads. Under normal operation, a 552 * transaction will read extra rows from the database into cache if range 553 * reads are used to page through a series of data one row at a time (i.e. 554 * if a range read with a one row limit is followed by another one row range 555 * read starting immediately after the result of the first). 556 * Parameter: Option takes no parameter 557 */ 558 void setReadAheadDisable() const 559 { 560 setTransactionOption(TransactionOption.READ_AHEAD_DISABLE); 561 } 562 563 // Parameter: Option takes no parameter 564 void setDurabilityDatacenter() const 565 { 566 setTransactionOption(TransactionOption.DURABILITY_DATACENTER); 567 } 568 569 // Parameter: Option takes no parameter 570 void setDurabilityRisky() const 571 { 572 setTransactionOption(TransactionOption.DURABILITY_RISKY); 573 } 574 575 // Parameter: Option takes no parameter 576 void setDurabilityDevNullIsWebScale() const 577 { 578 setTransactionOption( 579 TransactionOption.DURABILITY_DEV_NULL_IS_WEB_SCALE); 580 } 581 582 /** 583 * Specifies that this transaction should be treated as highest priority and 584 * that lower priority transactions should block behind this one. Use is 585 * discouraged outside of low-level tools 586 * Parameter: Option takes no parameter 587 */ 588 void setPrioritySystemImmediate() const 589 { 590 setTransactionOption(TransactionOption.PRIORITY_SYSTEM_IMMEDIATE); 591 } 592 593 /** 594 * Specifies that this transaction should be treated as low priority and 595 * that default priority transactions should be processed first. Useful for 596 * doing batch work simultaneously with latency-sensitive work 597 * Parameter: Option takes no parameter 598 */ 599 void setPriorityBatch() const 600 { 601 setTransactionOption(TransactionOption.PRIORITY_BATCH); 602 } 603 604 /** 605 * This is a write-only transaction which sets the initial configuration 606 * Parameter: Option takes no parameter 607 */ 608 void setInitializeNewDatabase() const 609 { 610 setTransactionOption(TransactionOption.INITIALIZE_NEW_DATABASE); 611 } 612 613 /** 614 * Allows this transaction to read and modify system keys (those that start 615 * with the byte 0xFF) 616 * Parameter: Option takes no parameter 617 */ 618 void setAccessSystemKeys() const 619 { 620 setTransactionOption(TransactionOption.ACCESS_SYSTEM_KEYS); 621 } 622 623 // Parameter: Option takes no parameter 624 void setDebugDump() const 625 { 626 setTransactionOption(TransactionOption.DEBUG_DUMP); 627 } 628 629 /** 630 * Set a timeout in milliseconds which, when elapsed, will cause the 631 * transaction automatically to be cancelled. Valid parameter values are 632 * ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and 633 * any future uses of the transaction will throw an exception. The 634 * transaction can be used again after it is reset. 635 * Parameter: (Int) value in milliseconds of timeout 636 */ 637 void setTimeout(const long value) const 638 { 639 setTransactionOption(TransactionOption.TIMEOUT, value); 640 } 641 642 /** 643 * Set a maximum number of retries after which additional calls to onError 644 * will throw the most recently seen error code. Valid parameter values are 645 * ``[-1, INT_MAX]``. If set to -1, will disable the retry limit. 646 * Parameter: (Int) number of times to retry 647 */ 648 void setRetryLimit(const long value) const 649 { 650 setTransactionOption(TransactionOption.RETRY_LIMIT, value); 651 } 652 653 private void setTransactionOption(const TransactionOption op) const 654 { 655 auto err = fdb_transaction_set_option( 656 cast(TransactionHandle)th, 657 op, 658 null, 659 0); 660 enforceError(err); 661 } 662 663 private void setTransactionOption( 664 const TransactionOption op, 665 const long value) const 666 { 667 auto err = fdb_transaction_set_option( 668 cast(TransactionHandle)th, 669 op, 670 cast(immutable(char)*)&value, 671 cast(int)value.sizeof); 672 enforceError(err); 673 } 674 675 shared(Value) opIndex(const Key key) 676 { 677 auto f = get(key); 678 auto value = f.await; 679 return value; 680 } 681 682 RecordRange opIndex(RangeInfo info) 683 { 684 auto f = getRange(info); 685 auto value = cast(RecordRange)f.await; 686 return value; 687 } 688 689 inout(Value) opIndexAssign(inout(Value) value, const Key key) 690 { 691 set(key, value); 692 return value; 693 } 694 695 void run(SimpleWorkFunc func) 696 { 697 WorkFunc wf = (tr, cb) 698 { 699 func(tr); 700 cb(null); 701 }; 702 703 VoidFutureCallback cb = (ex) 704 { 705 enforce(ex is null, ex); 706 }; 707 708 auto future = createFuture!retryLoop(this, wf, cb); 709 future.await; 710 }; 711 712 auto doTransaction( 713 WorkFunc func, 714 VoidFutureCallback commitCallback) 715 { 716 auto future = createFuture!retryLoop(this, func, commitCallback); 717 return future; 718 }; 719 } 720 721 void retryLoop( 722 shared Transaction tr, 723 WorkFunc func, 724 VoidFutureCallback cb) 725 { 726 try 727 { 728 func(tr, (ex) 729 { 730 if (ex) 731 onError(tr, ex, func, cb); 732 else 733 { 734 auto future = tr.commit((commitErr) 735 { 736 if (commitErr) 737 onError(tr, commitErr, func, cb); 738 else 739 cb(commitErr); 740 }); 741 future.await; 742 } 743 }); 744 } 745 catch (Exception ex) 746 { 747 onError(tr, ex, func, cb); 748 } 749 } 750 751 private void onError( 752 shared Transaction tr, 753 Exception ex, 754 WorkFunc func, 755 VoidFutureCallback cb) 756 { 757 if (auto fdbex = cast(FDBException)ex) 758 { 759 tr.onError(fdbex, (retryErr) 760 { 761 if (retryErr) 762 cb(retryErr); 763 else 764 retryLoop(tr, func, cb); 765 }); 766 } 767 else 768 { 769 tr.cancel(); 770 cb(ex); 771 } 772 };