1 module fdb.transaction; 2 3 import 4 std.array, 5 std.conv, 6 std.exception, 7 std.string; 8 9 import 10 fdb.context, 11 fdb.database, 12 fdb.disposable, 13 fdb.error, 14 fdb.fdb_c, 15 fdb.fdb_c_options, 16 fdb.future, 17 fdb.range, 18 fdb.rangeinfo; 19 20 shared interface IReadOnlyTransaction 21 { 22 @property bool isSnapshot(); 23 24 shared(Key) getKey(in Selector selector); 25 shared(KeyFuture) getKeyAsync( 26 in Selector selector, 27 KeyFutureCallback callback = null); 28 29 shared(Value) get(in Key key); 30 shared(ValueFuture) getAsync( 31 in Key key, 32 ValueFutureCallback callback = null); 33 34 /** 35 * Returns: Key-value pairs within (begin, end) range 36 */ 37 RecordRange getRange(RangeInfo info); 38 /// ditto 39 shared(KeyValueFuture) getRangeAsync( 40 RangeInfo info, 41 KeyValueFutureCallback callback = null); 42 43 void addReadConflictRange(RangeInfo info); 44 void addWriteConflictRange(RangeInfo info); 45 46 void onError(in FDBException ex); 47 shared(VoidFuture) onErrorAsync( 48 in FDBException ex, 49 VoidFutureCallback callback = null); 50 51 ulong getReadVersion(); 52 shared(VersionFuture) getReadVersionAsync( 53 VersionFutureCallback callback = null); 54 55 long getCommittedVersion(); 56 57 shared(string[]) getAddressesForKey(in Key key); 58 shared(StringFuture) getAddressesForKeyAsync( 59 in Key key, 60 StringFutureCallback callback = null); 61 62 shared(Value) opIndex(in Key key); 63 64 RecordRange opIndex(RangeInfo info); 65 } 66 67 shared class Transaction : IDatabaseContext, IDisposable, IReadOnlyTransaction 68 { 69 private const Database db; 70 private TransactionHandle th; 71 72 private const bool _isSnapshot; 73 @property bool isSnapshot() 74 { 75 return _isSnapshot; 76 } 77 78 private IDisposable[] futures; 79 80 invariant() 81 { 82 assert(db !is null); 83 } 84 85 this(TransactionHandle th, in shared Database db) 86 in 87 { 88 enforce(db !is null, "db must be set"); 89 enforce(th !is null, "th must be set"); 90 } 91 body 92 { 93 this.th = cast(shared)th; 94 this.db = db; 95 this._isSnapshot = false; 96 } 97 98 invariant() 99 { 100 assert(db !is null); 101 } 102 103 private this( 104 shared TransactionHandle th, 105 in shared Database db, 106 in bool isSnapshot) 107 in 108 { 109 enforce(db !is null, "db must be set"); 110 enforce(th !is null, "th must be set"); 111 } 112 body 113 { 114 this.th = cast(shared)th; 115 this.db = db; 116 this._isSnapshot = isSnapshot; 117 } 118 119 @property shared(IReadOnlyTransaction) snapshot() 120 { 121 auto snapshot = new shared Transaction(th, db, true); 122 return cast(shared IReadOnlyTransaction)snapshot; 123 } 124 125 ~this() 126 { 127 dispose; 128 } 129 130 void dispose() 131 { 132 // parent transaction should handle destruction 133 if (!th || isSnapshot) return; 134 135 fdb_transaction_destroy(cast(TransactionHandle)th); 136 th = null; 137 } 138 139 void set(in Key key, in Value value) const 140 in 141 { 142 enforce(key !is null); 143 enforce(!key.empty); 144 enforce(value !is null); 145 enforce(!value.empty); 146 } 147 body 148 { 149 fdb_transaction_set( 150 cast(TransactionHandle)th, 151 &key[0], 152 cast(int)key.length, 153 &value[0], 154 cast(int)value.length); 155 } 156 157 void commit() 158 { 159 // cancel, commit and reset are mutually exclusive 160 synchronized (this) 161 { 162 auto fh = fdb_transaction_commit(cast(TransactionHandle)th); 163 auto future = createFuture!VoidFuture(fh, this); 164 future.await; 165 } 166 } 167 168 auto commitAsync(VoidFutureCallback callback = null) 169 { 170 // cancel, commit and reset are mutually exclusive 171 synchronized (this) 172 { 173 auto fh = fdb_transaction_commit(cast(TransactionHandle)th); 174 auto future = startOrCreateFuture!VoidFuture(fh, this, callback); 175 futures ~= future; 176 return future; 177 } 178 } 179 180 void cancel() 181 { 182 // cancel, commit and reset are mutually exclusive 183 synchronized (this) 184 fdb_transaction_cancel(cast(TransactionHandle)th); 185 } 186 187 /** 188 * Resets transaction to its initial state 189 */ 190 void reset() 191 { 192 // cancel, commit and reset are mutually exclusive 193 synchronized (this) 194 fdb_transaction_reset(cast(TransactionHandle)th); 195 } 196 197 void clear(in Key key) const 198 in 199 { 200 enforce(key !is null); 201 enforce(!key.empty); 202 } 203 body 204 { 205 fdb_transaction_clear( 206 cast(TransactionHandle)th, 207 &key[0], 208 cast(int)key.length); 209 } 210 211 void clearRange(in RangeInfo info) const 212 in 213 { 214 enforce(!info.begin.key.empty); 215 enforce(!info.end.key.empty); 216 } 217 body 218 { 219 fdb_transaction_clear_range( 220 cast(TransactionHandle)th, 221 &info.begin.key[0], 222 cast(int)info.begin.key.length, 223 &info.end.key[0], 224 cast(int)info.end.key.length); 225 } 226 227 shared(Key) getKey(in Selector selector) 228 { 229 auto fh = fdb_transaction_get_key( 230 cast(TransactionHandle)th, 231 &selector.key[0], 232 cast(int)selector.key.length, 233 cast(fdb_bool_t)selector.orEqual, 234 selector.offset, 235 cast(fdb_bool_t)_isSnapshot); 236 237 scope auto future = createFuture!KeyFuture(fh, this); 238 239 auto value = future.await; 240 return value; 241 } 242 243 shared(KeyFuture) getKeyAsync( 244 in Selector selector, 245 KeyFutureCallback callback = null) 246 { 247 auto fh = fdb_transaction_get_key( 248 cast(TransactionHandle)th, 249 &selector.key[0], 250 cast(int)selector.key.length, 251 cast(fdb_bool_t)selector.orEqual, 252 selector.offset, 253 cast(fdb_bool_t)_isSnapshot); 254 255 auto future = startOrCreateFuture!KeyFuture(fh, this, callback); 256 synchronized (this) 257 futures ~= future; 258 return future; 259 } 260 261 shared(Value) get(in Key key) 262 in 263 { 264 enforce(key !is null); 265 enforce(!key.empty); 266 } 267 body 268 { 269 auto fh = fdb_transaction_get( 270 cast(TransactionHandle)th, 271 &key[0], 272 cast(int)key.length, 273 cast(fdb_bool_t)_isSnapshot); 274 275 scope auto future = createFuture!ValueFuture(fh, this); 276 277 auto value = future.await; 278 return value; 279 } 280 281 shared(ValueFuture) getAsync(in Key key, ValueFutureCallback callback = null) 282 in 283 { 284 enforce(key !is null); 285 enforce(!key.empty); 286 } 287 body 288 { 289 auto fh = fdb_transaction_get( 290 cast(TransactionHandle)th, 291 &key[0], 292 cast(int)key.length, 293 cast(fdb_bool_t)_isSnapshot); 294 295 auto future = startOrCreateFuture!ValueFuture(fh, this, callback); 296 synchronized (this) 297 futures ~= future; 298 return future; 299 } 300 301 /** 302 * Returns: Key-value pairs within (begin, end) range 303 */ 304 RecordRange getRange(RangeInfo info) 305 { 306 auto begin = sanitizeKey(info.begin.key, [ 0x00 ]); 307 auto end = sanitizeKey(info.end.key, [ 0xff ]); 308 309 auto fh = fdb_transaction_get_range( 310 cast(TransactionHandle)cast(TransactionHandle)th, 311 312 &begin[0], 313 cast(int)begin.length, 314 cast(fdb_bool_t)info.begin.orEqual, 315 info.begin.offset, 316 317 &end[0], 318 cast(int)end.length, 319 cast(fdb_bool_t)info.end.orEqual, 320 info.end.offset, 321 322 info.limit, 323 0, 324 info.mode, 325 info.iteration, 326 cast(fdb_bool_t)_isSnapshot, 327 info.reverse); 328 329 scope auto future = createFuture!KeyValueFuture(fh, this, info); 330 331 auto value = cast(RecordRange)future.await; 332 return value; 333 } 334 335 /** 336 * Returns: Key-value pairs within (begin, end) range 337 */ 338 shared(KeyValueFuture) getRangeAsync( 339 RangeInfo info, 340 KeyValueFutureCallback callback = null) 341 { 342 auto begin = sanitizeKey(info.begin.key, [ 0x00 ]); 343 auto end = sanitizeKey(info.end.key, [ 0xff ]); 344 345 auto fh = fdb_transaction_get_range( 346 cast(TransactionHandle)cast(TransactionHandle)th, 347 348 &begin[0], 349 cast(int)begin.length, 350 cast(fdb_bool_t)info.begin.orEqual, 351 info.begin.offset, 352 353 &end[0], 354 cast(int)end.length, 355 cast(fdb_bool_t)info.end.orEqual, 356 info.end.offset, 357 358 info.limit, 359 0, 360 info.mode, 361 info.iteration, 362 cast(fdb_bool_t)_isSnapshot, 363 info.reverse); 364 365 auto future = startOrCreateFuture!KeyValueFuture( 366 fh, this, info, callback); 367 synchronized (this) 368 futures ~= future; 369 return future; 370 } 371 372 auto watch(in Key key, VoidFutureCallback callback = null) 373 in 374 { 375 enforce(key !is null); 376 enforce(!key.empty); 377 } 378 body 379 { 380 auto fh = fdb_transaction_watch( 381 cast(TransactionHandle)th, 382 &key[0], 383 cast(int)key.length); 384 auto future = startOrCreateFuture!WatchFuture(fh, this, callback); 385 synchronized (this) 386 futures ~= future; 387 return future; 388 } 389 390 private void addConflictRange( 391 RangeInfo info, 392 in ConflictRangeType type) const 393 in 394 { 395 enforce(!info.begin.key.empty); 396 enforce(!info.end.key.empty); 397 } 398 body 399 { 400 auto err = fdb_transaction_add_conflict_range( 401 cast(TransactionHandle)th, 402 &info.begin.key[0], 403 cast(int)info.begin.key.length, 404 &info.end.key[0], 405 cast(int)info.end.key.length, 406 type); 407 enforceError(err); 408 } 409 410 void addReadConflictRange(RangeInfo info) const 411 { 412 addConflictRange(info, ConflictRangeType.READ); 413 } 414 415 void addWriteConflictRange(RangeInfo info) const 416 { 417 addConflictRange(info, ConflictRangeType.WRITE); 418 } 419 420 void onError(in FDBException ex) 421 { 422 onError(ex.err); 423 } 424 425 private void onError(const fdb_error_t err) 426 { 427 auto fh = fdb_transaction_on_error( 428 cast(TransactionHandle)th, 429 err); 430 431 scope auto future = createFuture!VoidFuture(fh, this); 432 future.await; 433 } 434 435 shared(VoidFuture) onErrorAsync( 436 in FDBException ex, 437 VoidFutureCallback callback = null) 438 { 439 auto fh = fdb_transaction_on_error( 440 cast(TransactionHandle)th, 441 ex.err); 442 auto future = startOrCreateFuture!VoidFuture(fh, this, callback); 443 synchronized (this) 444 futures ~= future; 445 return future; 446 } 447 448 void setReadVersion(in int ver) const 449 in 450 { 451 enforce(ver > 0); 452 } 453 body 454 { 455 fdb_transaction_set_read_version( 456 cast(TransactionHandle)th, 457 ver); 458 } 459 460 ulong getReadVersion() 461 { 462 auto fh = fdb_transaction_get_read_version( 463 cast(TransactionHandle)th); 464 465 scope auto future = createFuture!VersionFuture(fh, this); 466 467 auto value = future.await; 468 return value; 469 } 470 471 shared(VersionFuture) getReadVersionAsync(VersionFutureCallback callback = null) 472 { 473 auto fh = fdb_transaction_get_read_version( 474 cast(TransactionHandle)th); 475 auto future = startOrCreateFuture!VersionFuture(fh, this, callback); 476 synchronized (this) 477 futures ~= future; 478 return future; 479 } 480 481 long getCommittedVersion() const 482 { 483 long ver; 484 auto err = fdb_transaction_get_committed_version( 485 cast(TransactionHandle)th, 486 &ver); 487 enforceError(err); 488 return ver; 489 } 490 491 shared(string[]) getAddressesForKey(in Key key) 492 in 493 { 494 enforce(key !is null); 495 enforce(!key.empty); 496 } 497 body 498 { 499 auto fh = fdb_transaction_get_addresses_for_key( 500 cast(TransactionHandle)th, 501 &key[0], 502 cast(int)key.length); 503 504 scope auto future = createFuture!StringFuture(fh, this); 505 506 auto value = future.await; 507 return value; 508 } 509 510 shared(StringFuture) getAddressesForKeyAsync( 511 in Key key, 512 StringFutureCallback callback = null) 513 in 514 { 515 enforce(key !is null); 516 enforce(!key.empty); 517 } 518 body 519 { 520 auto fh = fdb_transaction_get_addresses_for_key( 521 cast(TransactionHandle)th, 522 &key[0], 523 cast(int)key.length); 524 525 auto future = startOrCreateFuture!StringFuture(fh, this, callback); 526 synchronized (this) 527 futures ~= future; 528 return future; 529 } 530 531 /** 532 * Performs an addition of little-endian integers. If the existing value 533 * in the database is not present or shorter than ``param``, it is first 534 * extended to the length of ``param`` with zero bytes. If ``param`` is 535 * shorter than the existing value in the database, the existing value is 536 * truncated to match the length of ``param``. The integers to be added 537 * must be stored in a little-endian representation. They can be signed 538 * in two's complement representation or unsigned. You can add to an integer 539 * at a known offset in the value by prepending the appropriate number of 540 * zero bytes to ``param`` and padding with zero bytes to match the length 541 * of the value. However, this offset technique requires that you know the 542 * addition will not cause the integer field within the value to overflow. 543 */ 544 void add(in Key key, in Value value) const 545 { 546 callAtomicOperation(key, value, MutationType.ADD); 547 } 548 549 /** 550 * Performs a bitwise ``and`` operation. If the existing value in the 551 * database is not present or shorter than ``param``, it is first extended 552 * to the length of ``param`` with zero bytes. If ``param`` is shorter than 553 * the existing value in the database, the existing value is truncated to 554 * match the length of ``param``. 555 */ 556 void bitAnd(in Key key, in Value value) const 557 { 558 callAtomicOperation(key, value, MutationType.BIT_AND); 559 } 560 561 /** 562 * Performs a bitwise ``or`` operation. If the existing value in the 563 * database is not present or shorter than ``param``, it is first extended 564 * to the length of ``param`` with zero bytes. If ``param`` is shorter than 565 * the existing value in the database, the existing value is truncated to 566 * match the length of ``param``. 567 */ 568 void bitOr(in Key key, in Value value) const 569 { 570 callAtomicOperation(key, value, MutationType.BIT_OR); 571 } 572 573 /** 574 * Performs a bitwise ``xor`` operation. If the existing value in the 575 * database is not present or shorter than ``param``, it is first extended 576 * to the length of ``param`` with zero bytes. If ``param`` is shorter than 577 * the existing value in the database, the existing value is truncated to 578 * match the length of ``param``. 579 */ 580 void bitXor(in Key key, in Value value) const 581 { 582 callAtomicOperation(key, value, MutationType.BIT_XOR); 583 } 584 585 /** 586 * Performs a little-endian comparison of byte strings. 587 * If the existing value in the database is not present or shorter than 588 * ``param``, it is first extended to the length of ``param`` with zero 589 * bytes. 590 * If ``param`` is shorter than the existing value in the database, the 591 * existing value is truncated to match the length of ``param``. 592 * The larger of the two values is then stored in the database. 593 */ 594 void bitMax(in Key key, in Value value) const 595 { 596 callAtomicOperation(key, value, MutationType.MAX); 597 } 598 599 /** 600 * Performs a little-endian comparison of byte strings. 601 * If the existing value in the database is not present or shorter than 602 * ``param``, it is first extended to the length of ``param`` with zero 603 * bytes. 604 * If ``param`` is shorter than the existing value in the database, the 605 * existing value is truncated to match the length of ``param``. 606 * The smaller of the two values is then stored in the database. 607 */ 608 void bitMin(in Key key, in Value value) const 609 { 610 callAtomicOperation(key, value, MutationType.MIN); 611 } 612 613 private void callAtomicOperation( 614 in Key key, 615 in Value value, 616 in MutationType type) const 617 in 618 { 619 enforce(key !is null); 620 enforce(!key.empty); 621 enforce(value !is null); 622 enforce(!value.empty); 623 } 624 body 625 { 626 fdb_transaction_atomic_op( 627 cast(TransactionHandle)th, 628 &key[0], 629 cast(int)key.length, 630 &value[0], 631 cast(int)value.length, 632 type); 633 } 634 635 /** 636 * The transaction, if not self-conflicting, may be committed a second time 637 * after commit succeeds, in the event of a fault 638 */ 639 void setCausalWriteRisky() const 640 { 641 setTransactionOption(TransactionOption.CAUSAL_WRITE_RISKY); 642 } 643 644 /** 645 * The read version will be committed, and usually will be the latest 646 * committed, but might not be the latest committed in the event of a fault 647 * or partition 648 */ 649 void setCausalReadRisky() const 650 { 651 setTransactionOption(TransactionOption.CAUSAL_READ_RISKY); 652 } 653 654 void setCausalReadDisable() const 655 { 656 setTransactionOption(TransactionOption.CAUSAL_READ_DISABLE); 657 } 658 659 /** 660 * The next write performed on this transaction will not generate a write 661 * conflict range. As a result, other transactions which read the key(s) 662 * being modified by the next write will not conflict with this transaction. 663 * Care needs to be taken when using this option on a transaction that is 664 * shared between multiple threads. When setting this option, write conflict 665 * ranges will be disabled on the next write operation, regardless of what 666 * thread it is on. 667 */ 668 void setNextWriteNoWriteConflictRange() const 669 { 670 setTransactionOption( 671 TransactionOption.NEXT_WRITE_NO_WRITE_CONFLICT_RANGE); 672 } 673 674 void setCheckWritesEnable() const 675 { 676 setTransactionOption(TransactionOption.CHECK_WRITES_ENABLE); 677 } 678 679 /** 680 * Reads performed by a transaction will not see any prior mutations that 681 * occured in that transaction, instead seeing the value which was in the 682 * database at the transaction's read version. This option may provide a 683 * small performance benefit for the client, but also disables a number of 684 * client-side optimizations which are beneficial for transactions which 685 * tend to read and write the same keys within a single transaction. 686 */ 687 void setReadYourWritesDisable() const 688 { 689 setTransactionOption(TransactionOption.READ_YOUR_WRITES_DISABLE); 690 } 691 692 /** 693 * Disables read-ahead caching for range reads. Under normal operation, a 694 * transaction will read extra rows from the database into cache if range 695 * reads are used to page through a series of data one row at a time (i.e. 696 * if a range read with a one row limit is followed by another one row range 697 * read starting immediately after the result of the first). 698 */ 699 void setReadAheadDisable() const 700 { 701 setTransactionOption(TransactionOption.READ_AHEAD_DISABLE); 702 } 703 704 /** 705 * Specifies that this transaction should be treated as highest priority and 706 * that lower priority transactions should block behind this one. Use is 707 * discouraged outside of low-level tools 708 */ 709 void setPrioritySystemImmediate() const 710 { 711 setTransactionOption(TransactionOption.PRIORITY_SYSTEM_IMMEDIATE); 712 } 713 714 /** 715 * Specifies that this transaction should be treated as low priority and 716 * that default priority transactions should be processed first. Useful for 717 * doing batch work simultaneously with latency-sensitive work 718 */ 719 void setPriorityBatch() const 720 { 721 setTransactionOption(TransactionOption.PRIORITY_BATCH); 722 } 723 724 /** 725 * This is a write-only transaction which sets the initial configuration 726 */ 727 void setInitializeNewDatabase() const 728 { 729 setTransactionOption(TransactionOption.INITIALIZE_NEW_DATABASE); 730 } 731 732 /** 733 * Allows this transaction to read and modify system keys (those that start 734 * with the byte 0xFF) 735 */ 736 void setAccessSystemKeys() const 737 { 738 setTransactionOption(TransactionOption.ACCESS_SYSTEM_KEYS); 739 } 740 741 /** 742 * Allows this transaction to read system keys (those that start with the 743 * byte 0xFF) 744 */ 745 void setReadSystemKeys() const 746 { 747 setTransactionOption(TransactionOption.READ_SYSTEM_KEYS); 748 } 749 750 void setDebugDump() const 751 { 752 setTransactionOption(TransactionOption.DEBUG_DUMP); 753 } 754 755 /** 756 * Params: 757 * transactionName = Optional transaction name 758 */ 759 void setDebugRetryLogging(in string transactionName = null) const 760 { 761 setTransactionOption( 762 TransactionOption.DEBUG_RETRY_LOGGING, 763 transactionName); 764 } 765 766 /** 767 * Set a timeout in milliseconds which, when elapsed, will cause the 768 * transaction automatically to be cancelled. Valid parameter values are 769 * ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and 770 * any future uses of the transaction will throw an exception. The 771 * transaction can be used again after it is reset. 772 * Params: 773 * value = value in milliseconds of timeout 774 */ 775 void setTimeout(in int value) const 776 { 777 setTransactionOption(TransactionOption.TIMEOUT, value); 778 } 779 780 /** 781 * Set a maximum number of retries after which additional calls to onError 782 * will throw the most recently seen error code. Valid parameter values are 783 * ``[-1, INT_MAX]``. If set to -1, will disable the retry limit. 784 * Params: 785 * value = number of times to retry 786 */ 787 void setRetryLimit(in int value) const 788 { 789 setTransactionOption(TransactionOption.RETRY_LIMIT, value); 790 } 791 792 /** 793 * Set the maximum amount of backoff delay incurred in the call to onError 794 * if the error is retryable. 795 * Defaults to 1000 ms. Valid parameter values are [0, int.MaxValue]. 796 * Like all transaction options, the maximum retry delay must be reset 797 * after a call to onError. 798 * If the maximum retry delay is less than the current retry delay of the 799 * transaction, then the current retry delay will be clamped to the maximum 800 * retry delay. 801 * Params: 802 * value = value in milliseconds of maximum delay 803 */ 804 void setMaxRetryDelayLimit(in int value) const 805 { 806 setTransactionOption(TransactionOption.MAX_RETRY_DELAY, value); 807 } 808 809 /** 810 * Snapshot read operations will see the results of writes done in the same 811 * transaction. 812 */ 813 void setSnapshotReadYourWriteEnable() const 814 { 815 setTransactionOption(TransactionOption.SNAPSHOT_READ_YOUR_WRITE_ENABLE); 816 } 817 818 /** 819 * Snapshot read operations will not see the results of writes done in the 820 * same transaction. 821 */ 822 void setSnapshotReadYourWriteDisable() const 823 { 824 setTransactionOption( 825 TransactionOption.SNAPSHOT_READ_YOUR_WRITE_DISABLE); 826 } 827 828 private void setTransactionOption(in TransactionOption op) const 829 { 830 auto err = fdb_transaction_set_option( 831 cast(TransactionHandle)th, 832 op, 833 null, 834 0); 835 enforceError(err); 836 } 837 838 private void setTransactionOption( 839 in TransactionOption op, 840 in int value) const 841 { 842 auto err = fdb_transaction_set_option( 843 cast(TransactionHandle)th, 844 op, 845 cast(immutable(char)*)&value, 846 cast(int)value.sizeof); 847 enforceError(err); 848 } 849 850 private void setTransactionOption( 851 in TransactionOption op, 852 in string value) const 853 { 854 auto err = fdb_transaction_set_option( 855 cast(TransactionHandle)th, 856 op, 857 value.toStringz, 858 cast(int)value.length); 859 enforceError(err); 860 } 861 862 shared(Value) opIndex(in Key key) 863 { 864 return get(key); 865 } 866 867 RecordRange opIndex(RangeInfo info) 868 { 869 return getRange(info); 870 } 871 872 inout(Value) opIndexAssign(inout(Value) value, in Key key) 873 { 874 set(key, value); 875 return value; 876 } 877 878 void run(in WorkFunc func) 879 { 880 retryLoop(this, func); 881 }; 882 883 auto runAsync(in WorkFunc func, in VoidFutureCallback commitCallback) 884 { 885 auto future = createFuture!retryLoopAsync(this, func, commitCallback); 886 return future; 887 }; 888 } 889 890 void retryLoop(shared Transaction tr, in WorkFunc func) 891 { 892 while (true) 893 { 894 try 895 { 896 func(tr); 897 tr.commit; 898 return; 899 } 900 catch (Exception ex) 901 { 902 if (auto fdbex = cast(FDBException)ex) 903 tr.onError(fdbex); 904 else 905 { 906 // the error cannot be retried so we need to cancel the 907 // transaction 908 tr.cancel; 909 throw ex; 910 } 911 } 912 } 913 } 914 915 void retryLoopAsync( 916 shared Transaction tr, 917 in WorkFunc func, 918 in VoidFutureCallback cb) 919 { 920 while (true) 921 { 922 try 923 { 924 func(tr); 925 tr.commit; 926 cb(null); 927 return; 928 } 929 catch (Exception ex) 930 { 931 if (auto fdbex = cast(FDBException)ex) 932 tr.onError(fdbex); 933 else 934 { 935 // the error cannot be retried so we need to cancel the 936 // transaction 937 tr.cancel; 938 cb(cast(Exception) ex); 939 return; 940 } 941 } 942 } 943 }