1 module fdb.transaction;
2 
3 import
4     std.array,
5     std.conv,
6     std.exception,
7     std.string;
8 
9 import
10     fdb.context,
11     fdb.database,
12     fdb.disposable,
13     fdb.error,
14     fdb.fdb_c,
15     fdb.fdb_c_options,
16     fdb.future,
17     fdb.range,
18     fdb.rangeinfo;
19 
20 shared interface IReadOnlyTransaction
21 {
22     @property bool isSnapshot();
23 
24     shared(Key) getKey(in Selector selector);
25     shared(KeyFuture) getKeyAsync(
26         in Selector       selector,
27         KeyFutureCallback callback = null);
28 
29     shared(Value) get(in Key key);
30     shared(ValueFuture) getAsync(
31         in Key              key,
32         ValueFutureCallback callback = null);
33 
34     /**
35      * Returns: Key-value pairs within (begin, end) range
36      */
37     RecordRange getRange(RangeInfo info);
38     /// ditto
39     shared(KeyValueFuture) getRangeAsync(
40         RangeInfo              info,
41         KeyValueFutureCallback callback = null);
42 
43     void addReadConflictRange(RangeInfo info);
44     void addWriteConflictRange(RangeInfo info);
45 
46     void onError(in FDBException ex);
47     shared(VoidFuture) onErrorAsync(
48         in FDBException    ex,
49         VoidFutureCallback callback = null);
50 
51     ulong getReadVersion();
52     shared(VersionFuture) getReadVersionAsync(
53         VersionFutureCallback callback = null);
54 
55     long getCommittedVersion();
56 
57     shared(string[]) getAddressesForKey(in Key key);
58     shared(StringFuture) getAddressesForKeyAsync(
59         in Key               key,
60         StringFutureCallback callback = null);
61 
62     shared(Value) opIndex(in Key key);
63 
64     RecordRange opIndex(RangeInfo info);
65 }
66 
67 shared class Transaction : IDatabaseContext, IDisposable, IReadOnlyTransaction
68 {
69     private const Database    db;
70     private TransactionHandle th;
71 
72     private const bool _isSnapshot;
73     @property bool isSnapshot()
74     {
75         return _isSnapshot;
76     }
77 
78     private IDisposable[] futures;
79 
80     invariant()
81     {
82         assert(db !is null);
83     }
84 
85     this(TransactionHandle th, in shared Database db)
86     in
87     {
88         enforce(db !is null, "db must be set");
89         enforce(th !is null, "th must be set");
90     }
91     body
92     {
93         this.th          = cast(shared)th;
94         this.db          = db;
95         this._isSnapshot = false;
96     }
97 
98     invariant()
99     {
100         assert(db !is null);
101     }
102 
103     private this(
104         shared TransactionHandle th,
105         in shared Database       db,
106         in bool                  isSnapshot)
107     in
108     {
109         enforce(db !is null, "db must be set");
110         enforce(th !is null, "th must be set");
111     }
112     body
113     {
114         this.th          = cast(shared)th;
115         this.db          = db;
116         this._isSnapshot = isSnapshot;
117     }
118 
119     @property shared(IReadOnlyTransaction) snapshot()
120     {
121         auto snapshot = new shared Transaction(th, db, true);
122         return cast(shared IReadOnlyTransaction)snapshot;
123     }
124 
125     ~this()
126     {
127         dispose;
128     }
129 
130     void dispose()
131     {
132         // parent transaction should handle destruction
133         if (!th || isSnapshot) return;
134 
135         fdb_transaction_destroy(cast(TransactionHandle)th);
136         th = null;
137     }
138 
139     void set(in Key key, in Value value) const
140     in
141     {
142         enforce(key !is null);
143         enforce(!key.empty);
144         enforce(value !is null);
145         enforce(!value.empty);
146     }
147     body
148     {
149         fdb_transaction_set(
150             cast(TransactionHandle)th,
151             &key[0],
152             cast(int)key.length,
153             &value[0],
154             cast(int)value.length);
155     }
156 
157     void commit()
158     {
159         // cancel, commit and reset are mutually exclusive
160         synchronized (this)
161         {
162             auto fh     = fdb_transaction_commit(cast(TransactionHandle)th);
163             auto future = createFuture!VoidFuture(fh, this);
164             future.await;
165         }
166     }
167 
168     auto commitAsync(VoidFutureCallback callback = null)
169     {
170         // cancel, commit and reset are mutually exclusive
171         synchronized (this)
172         {
173             auto fh      = fdb_transaction_commit(cast(TransactionHandle)th);
174             auto future  = startOrCreateFuture!VoidFuture(fh, this, callback);
175             futures     ~= future;
176             return future;
177         }
178     }
179 
180     void cancel()
181     {
182         // cancel, commit and reset are mutually exclusive
183         synchronized (this)
184             fdb_transaction_cancel(cast(TransactionHandle)th);
185     }
186 
187     /**
188      * Resets transaction to its initial state
189      */
190     void reset()
191     {
192         // cancel, commit and reset are mutually exclusive
193         synchronized (this)
194             fdb_transaction_reset(cast(TransactionHandle)th);
195     }
196 
197     void clear(in Key key) const
198     in
199     {
200         enforce(key !is null);
201         enforce(!key.empty);
202     }
203     body
204     {
205         fdb_transaction_clear(
206             cast(TransactionHandle)th,
207             &key[0],
208             cast(int)key.length);
209     }
210 
211     void clearRange(in RangeInfo info) const
212     in
213     {
214         enforce(!info.begin.key.empty);
215         enforce(!info.end.key.empty);
216     }
217     body
218     {
219         fdb_transaction_clear_range(
220             cast(TransactionHandle)th,
221             &info.begin.key[0],
222             cast(int)info.begin.key.length,
223             &info.end.key[0],
224             cast(int)info.end.key.length);
225     }
226 
227     shared(Key) getKey(in Selector selector)
228     {
229         auto fh = fdb_transaction_get_key(
230             cast(TransactionHandle)th,
231             &selector.key[0],
232             cast(int)selector.key.length,
233             cast(fdb_bool_t)selector.orEqual,
234             selector.offset,
235             cast(fdb_bool_t)_isSnapshot);
236 
237         scope auto future = createFuture!KeyFuture(fh, this);
238 
239         auto value = future.await;
240         return value;
241     }
242 
243     shared(KeyFuture) getKeyAsync(
244         in Selector       selector,
245         KeyFutureCallback callback = null)
246     {
247         auto fh = fdb_transaction_get_key(
248             cast(TransactionHandle)th,
249             &selector.key[0],
250             cast(int)selector.key.length,
251             cast(fdb_bool_t)selector.orEqual,
252             selector.offset,
253             cast(fdb_bool_t)_isSnapshot);
254 
255         auto future = startOrCreateFuture!KeyFuture(fh, this, callback);
256         synchronized (this)
257             futures ~= future;
258         return future;
259     }
260 
261     shared(Value) get(in Key key)
262     in
263     {
264         enforce(key !is null);
265         enforce(!key.empty);
266     }
267     body
268     {
269         auto fh = fdb_transaction_get(
270             cast(TransactionHandle)th,
271             &key[0],
272             cast(int)key.length,
273             cast(fdb_bool_t)_isSnapshot);
274 
275         scope auto future = createFuture!ValueFuture(fh, this);
276 
277         auto value = future.await;
278         return value;
279     }
280 
281     shared(ValueFuture) getAsync(in Key key, ValueFutureCallback callback = null)
282     in
283     {
284         enforce(key !is null);
285         enforce(!key.empty);
286     }
287     body
288     {
289         auto fh = fdb_transaction_get(
290             cast(TransactionHandle)th,
291             &key[0],
292             cast(int)key.length,
293             cast(fdb_bool_t)_isSnapshot);
294 
295         auto future = startOrCreateFuture!ValueFuture(fh, this, callback);
296         synchronized (this)
297             futures ~= future;
298         return future;
299     }
300 
301     /**
302      * Returns: Key-value pairs within (begin, end) range
303      */
304     RecordRange getRange(RangeInfo info)
305     {
306         auto begin = sanitizeKey(info.begin.key, [ 0x00 ]);
307         auto end   = sanitizeKey(info.end.key, [ 0xff ]);
308 
309         auto fh = fdb_transaction_get_range(
310             cast(TransactionHandle)cast(TransactionHandle)th,
311 
312             &begin[0],
313             cast(int)begin.length,
314             cast(fdb_bool_t)info.begin.orEqual,
315             info.begin.offset,
316 
317             &end[0],
318             cast(int)end.length,
319             cast(fdb_bool_t)info.end.orEqual,
320             info.end.offset,
321 
322             info.limit,
323             0,
324             info.mode,
325             info.iteration,
326             cast(fdb_bool_t)_isSnapshot,
327             info.reverse);
328 
329         scope auto future = createFuture!KeyValueFuture(fh, this, info);
330 
331         auto value = cast(RecordRange)future.await;
332         return value;
333     }
334 
335     /**
336      * Returns: Key-value pairs within (begin, end) range
337      */
338     shared(KeyValueFuture) getRangeAsync(
339         RangeInfo              info,
340         KeyValueFutureCallback callback = null)
341     {
342         auto begin = sanitizeKey(info.begin.key, [ 0x00 ]);
343         auto end   = sanitizeKey(info.end.key, [ 0xff ]);
344 
345         auto fh = fdb_transaction_get_range(
346             cast(TransactionHandle)cast(TransactionHandle)th,
347 
348             &begin[0],
349             cast(int)begin.length,
350             cast(fdb_bool_t)info.begin.orEqual,
351             info.begin.offset,
352 
353             &end[0],
354             cast(int)end.length,
355             cast(fdb_bool_t)info.end.orEqual,
356             info.end.offset,
357 
358             info.limit,
359             0,
360             info.mode,
361             info.iteration,
362             cast(fdb_bool_t)_isSnapshot,
363             info.reverse);
364 
365         auto future = startOrCreateFuture!KeyValueFuture(
366             fh, this, info, callback);
367         synchronized (this)
368             futures ~= future;
369         return future;
370     }
371 
372     auto watch(in Key key, VoidFutureCallback callback = null)
373     in
374     {
375         enforce(key !is null);
376         enforce(!key.empty);
377     }
378     body
379     {
380         auto fh = fdb_transaction_watch(
381             cast(TransactionHandle)th,
382             &key[0],
383             cast(int)key.length);
384         auto future = startOrCreateFuture!WatchFuture(fh, this, callback);
385         synchronized (this)
386             futures ~= future;
387         return future;
388     }
389 
390     private void addConflictRange(
391         RangeInfo            info,
392         in ConflictRangeType type) const
393     in
394     {
395         enforce(!info.begin.key.empty);
396         enforce(!info.end.key.empty);
397     }
398     body
399     {
400         auto err = fdb_transaction_add_conflict_range(
401             cast(TransactionHandle)th,
402             &info.begin.key[0],
403             cast(int)info.begin.key.length,
404             &info.end.key[0],
405             cast(int)info.end.key.length,
406             type);
407         enforceError(err);
408     }
409 
410     void addReadConflictRange(RangeInfo info) const
411     {
412         addConflictRange(info, ConflictRangeType.READ);
413     }
414 
415     void addWriteConflictRange(RangeInfo info) const
416     {
417         addConflictRange(info, ConflictRangeType.WRITE);
418     }
419 
420     void onError(in FDBException ex)
421     {
422         onError(ex.err);
423     }
424 
425     private void onError(const fdb_error_t err)
426     {
427         auto fh = fdb_transaction_on_error(
428             cast(TransactionHandle)th,
429             err);
430 
431         scope auto future = createFuture!VoidFuture(fh, this);
432         future.await;
433     }
434 
435     shared(VoidFuture) onErrorAsync(
436         in FDBException    ex,
437         VoidFutureCallback callback = null)
438     {
439         auto fh = fdb_transaction_on_error(
440             cast(TransactionHandle)th,
441             ex.err);
442         auto future = startOrCreateFuture!VoidFuture(fh, this, callback);
443         synchronized (this)
444             futures ~= future;
445         return future;
446     }
447 
448     void setReadVersion(in int ver) const
449     in
450     {
451         enforce(ver > 0);
452     }
453     body
454     {
455         fdb_transaction_set_read_version(
456             cast(TransactionHandle)th,
457             ver);
458     }
459 
460     ulong getReadVersion()
461     {
462         auto fh = fdb_transaction_get_read_version(
463             cast(TransactionHandle)th);
464 
465         scope auto future = createFuture!VersionFuture(fh, this);
466 
467         auto value = future.await;
468         return value;
469     }
470 
471     shared(VersionFuture) getReadVersionAsync(VersionFutureCallback callback = null)
472     {
473         auto fh = fdb_transaction_get_read_version(
474             cast(TransactionHandle)th);
475         auto future = startOrCreateFuture!VersionFuture(fh, this, callback);
476         synchronized (this)
477             futures ~= future;
478         return future;
479     }
480 
481     long getCommittedVersion() const
482     {
483         long ver;
484         auto err = fdb_transaction_get_committed_version(
485             cast(TransactionHandle)th,
486             &ver);
487         enforceError(err);
488         return ver;
489     }
490 
491     shared(string[]) getAddressesForKey(in Key key)
492     in
493     {
494         enforce(key !is null);
495         enforce(!key.empty);
496     }
497     body
498     {
499         auto fh = fdb_transaction_get_addresses_for_key(
500             cast(TransactionHandle)th,
501             &key[0],
502             cast(int)key.length);
503 
504         scope auto future = createFuture!StringFuture(fh, this);
505 
506         auto value = future.await;
507         return value;
508     }
509 
510     shared(StringFuture) getAddressesForKeyAsync(
511         in Key               key,
512         StringFutureCallback callback = null)
513     in
514     {
515         enforce(key !is null);
516         enforce(!key.empty);
517     }
518     body
519     {
520         auto fh = fdb_transaction_get_addresses_for_key(
521             cast(TransactionHandle)th,
522             &key[0],
523             cast(int)key.length);
524 
525         auto future = startOrCreateFuture!StringFuture(fh, this, callback);
526         synchronized (this)
527             futures ~= future;
528         return future;
529     }
530 
531     /**
532      * Performs an addition of little-endian integers. If the existing value
533      * in the database is not present or shorter than ``param``, it is first
534      * extended to the length of ``param`` with zero bytes.  If ``param`` is
535      * shorter than the existing value in the database, the existing value is
536      * truncated to match the length of ``param``. The integers to be added
537      * must be stored in a little-endian representation.  They can be signed
538      * in two's complement representation or unsigned. You can add to an integer
539      * at a known offset in the value by prepending the appropriate number of
540      * zero bytes to ``param`` and padding with zero bytes to match the length
541      * of the value. However, this offset technique requires that you know the
542      * addition will not cause the integer field within the value to overflow.
543      */
544     void add(in Key key, in Value value) const
545     {
546         callAtomicOperation(key, value, MutationType.ADD);
547     }
548 
549     /**
550      * Performs a bitwise ``and`` operation.  If the existing value in the
551      * database is not present or shorter than ``param``, it is first extended
552      * to the length of ``param`` with zero bytes.  If ``param`` is shorter than
553      * the existing value in the database, the existing value is truncated to
554      * match the length of ``param``.
555      */
556     void bitAnd(in Key key, in Value value) const
557     {
558         callAtomicOperation(key, value, MutationType.BIT_AND);
559     }
560 
561     /**
562      * Performs a bitwise ``or`` operation.  If the existing value in the
563      * database is not present or shorter than ``param``, it is first extended
564      * to the length of ``param`` with zero bytes.  If ``param`` is shorter than
565      * the existing value in the database, the existing value is truncated to
566      * match the length of ``param``.
567      */
568     void bitOr(in Key key, in Value value) const
569     {
570         callAtomicOperation(key, value, MutationType.BIT_OR);
571     }
572 
573     /**
574      * Performs a bitwise ``xor`` operation.  If the existing value in the
575      * database is not present or shorter than ``param``, it is first extended
576      * to the length of ``param`` with zero bytes.  If ``param`` is shorter than
577      * the existing value in the database, the existing value is truncated to
578      * match the length of ``param``.
579      */
580     void bitXor(in Key key, in Value value) const
581     {
582         callAtomicOperation(key, value, MutationType.BIT_XOR);
583     }
584 
585     /**
586      * Performs a little-endian comparison of byte strings.
587      * If the existing value in the database is not present or shorter than
588      * ``param``, it is first extended to the length of ``param`` with zero
589      * bytes.
590      * If ``param`` is shorter than the existing value in the database, the
591      * existing value is truncated to match the length of ``param``.
592      * The larger of the two values is then stored in the database.
593      */
594     void bitMax(in Key key, in Value value) const
595     {
596         callAtomicOperation(key, value, MutationType.MAX);
597     }
598 
599     /**
600      * Performs a little-endian comparison of byte strings.
601      * If the existing value in the database is not present or shorter than
602      * ``param``, it is first extended to the length of ``param`` with zero
603      * bytes.
604      * If ``param`` is shorter than the existing value in the database, the
605      * existing value is truncated to match the length of ``param``.
606      * The smaller of the two values is then stored in the database.
607      */
608     void bitMin(in Key key, in Value value) const
609     {
610         callAtomicOperation(key, value, MutationType.MIN);
611     }
612 
613     private void callAtomicOperation(
614         in Key          key,
615         in Value        value,
616         in MutationType type) const
617     in
618     {
619         enforce(key !is null);
620         enforce(!key.empty);
621         enforce(value !is null);
622         enforce(!value.empty);
623     }
624     body
625     {
626         fdb_transaction_atomic_op(
627             cast(TransactionHandle)th,
628             &key[0],
629             cast(int)key.length,
630             &value[0],
631             cast(int)value.length,
632             type);
633     }
634 
635     /**
636      * The transaction, if not self-conflicting, may be committed a second time
637      * after commit succeeds, in the event of a fault
638      */
639     void setCausalWriteRisky() const
640     {
641         setTransactionOption(TransactionOption.CAUSAL_WRITE_RISKY);
642     }
643 
644     /**
645      * The read version will be committed, and usually will be the latest
646      * committed, but might not be the latest committed in the event of a fault
647      * or partition
648      */
649     void setCausalReadRisky() const
650     {
651         setTransactionOption(TransactionOption.CAUSAL_READ_RISKY);
652     }
653 
654     void setCausalReadDisable() const
655     {
656         setTransactionOption(TransactionOption.CAUSAL_READ_DISABLE);
657     }
658 
659     /**
660      * The next write performed on this transaction will not generate a write
661      * conflict range. As a result, other transactions which read the key(s)
662      * being modified by the next write will not conflict with this transaction.
663      * Care needs to be taken when using this option on a transaction that is
664      * shared between multiple threads. When setting this option, write conflict
665      * ranges will be disabled on the next write operation, regardless of what
666      * thread it is on.
667      */
668     void setNextWriteNoWriteConflictRange() const
669     {
670         setTransactionOption(
671             TransactionOption.NEXT_WRITE_NO_WRITE_CONFLICT_RANGE);
672     }
673 
674     void setCheckWritesEnable() const
675     {
676         setTransactionOption(TransactionOption.CHECK_WRITES_ENABLE);
677     }
678 
679     /**
680      * Reads performed by a transaction will not see any prior mutations that
681      * occured in that transaction, instead seeing the value which was in the
682      * database at the transaction's read version. This option may provide a
683      * small performance benefit for the client, but also disables a number of
684      * client-side optimizations which are beneficial for transactions which
685      * tend to read and write the same keys within a single transaction.
686      */
687     void setReadYourWritesDisable() const
688     {
689         setTransactionOption(TransactionOption.READ_YOUR_WRITES_DISABLE);
690     }
691 
692     /**
693      * Disables read-ahead caching for range reads. Under normal operation, a
694      * transaction will read extra rows from the database into cache if range
695      * reads are used to page through a series of data one row at a time (i.e.
696      * if a range read with a one row limit is followed by another one row range
697      * read starting immediately after the result of the first).
698      */
699     void setReadAheadDisable() const
700     {
701         setTransactionOption(TransactionOption.READ_AHEAD_DISABLE);
702     }
703 
704     /**
705      * Specifies that this transaction should be treated as highest priority and
706      * that lower priority transactions should block behind this one. Use is
707      * discouraged outside of low-level tools
708      */
709     void setPrioritySystemImmediate() const
710     {
711         setTransactionOption(TransactionOption.PRIORITY_SYSTEM_IMMEDIATE);
712     }
713 
714     /**
715      * Specifies that this transaction should be treated as low priority and
716      * that default priority transactions should be processed first. Useful for
717      * doing batch work simultaneously with latency-sensitive work
718      */
719     void setPriorityBatch() const
720     {
721         setTransactionOption(TransactionOption.PRIORITY_BATCH);
722     }
723 
724     /**
725      * This is a write-only transaction which sets the initial configuration
726      */
727     void setInitializeNewDatabase() const
728     {
729         setTransactionOption(TransactionOption.INITIALIZE_NEW_DATABASE);
730     }
731 
732     /**
733      * Allows this transaction to read and modify system keys (those that start
734      * with the byte 0xFF)
735      */
736     void setAccessSystemKeys() const
737     {
738         setTransactionOption(TransactionOption.ACCESS_SYSTEM_KEYS);
739     }
740 
741     /**
742      * Allows this transaction to read system keys (those that start with the
743      * byte 0xFF)
744      */
745     void setReadSystemKeys() const
746     {
747         setTransactionOption(TransactionOption.READ_SYSTEM_KEYS);
748     }
749 
750     void setDebugDump() const
751     {
752         setTransactionOption(TransactionOption.DEBUG_DUMP);
753     }
754 
755     /**
756      * Params:
757      *      transactionName = Optional transaction name
758      */
759     void setDebugRetryLogging(in string transactionName = null) const
760     {
761         setTransactionOption(
762             TransactionOption.DEBUG_RETRY_LOGGING,
763             transactionName);
764     }
765 
766     /**
767      * Set a timeout in milliseconds which, when elapsed, will cause the
768      * transaction automatically to be cancelled. Valid parameter values are
769      * ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and
770      * any future uses of the transaction will throw an exception. The
771      * transaction can be used again after it is reset.
772      * Params:
773      *      value = value in milliseconds of timeout
774      */
775     void setTimeout(in int value) const
776     {
777         setTransactionOption(TransactionOption.TIMEOUT, value);
778     }
779 
780     /**
781      * Set a maximum number of retries after which additional calls to onError
782      * will throw the most recently seen error code. Valid parameter values are
783      * ``[-1, INT_MAX]``. If set to -1, will disable the retry limit.
784      * Params:
785      *      value = number of times to retry
786      */
787     void setRetryLimit(in int value) const
788     {
789         setTransactionOption(TransactionOption.RETRY_LIMIT, value);
790     }
791 
792     /**
793      * Set the maximum amount of backoff delay incurred in the call to onError
794      * if the error is retryable.
795      * Defaults to 1000 ms. Valid parameter values are [0, int.MaxValue].
796      * Like all transaction options, the maximum retry delay must be reset
797      * after a call to onError.
798      * If the maximum retry delay is less than the current retry delay of the
799      * transaction, then the current retry delay will be clamped to the maximum
800      * retry delay.
801      * Params:
802      *      value = value in milliseconds of maximum delay
803      */
804     void setMaxRetryDelayLimit(in int value) const
805     {
806         setTransactionOption(TransactionOption.MAX_RETRY_DELAY, value);
807     }
808 
809     /**
810      * Snapshot read operations will see the results of writes done in the same
811      * transaction.
812      */
813     void setSnapshotReadYourWriteEnable() const
814     {
815         setTransactionOption(TransactionOption.SNAPSHOT_READ_YOUR_WRITE_ENABLE);
816     }
817 
818     /**
819      * Snapshot read operations will not see the results of writes done in the
820      * same transaction.
821      */
822     void setSnapshotReadYourWriteDisable() const
823     {
824         setTransactionOption(
825             TransactionOption.SNAPSHOT_READ_YOUR_WRITE_DISABLE);
826     }
827 
828     private void setTransactionOption(in TransactionOption op) const
829     {
830         auto err = fdb_transaction_set_option(
831             cast(TransactionHandle)th,
832             op,
833             null,
834             0);
835         enforceError(err);
836     }
837 
838     private void setTransactionOption(
839         in TransactionOption op,
840         in int               value) const
841     {
842         auto err = fdb_transaction_set_option(
843             cast(TransactionHandle)th,
844             op,
845             cast(immutable(char)*)&value,
846             cast(int)value.sizeof);
847         enforceError(err);
848     }
849 
850     private void setTransactionOption(
851         in TransactionOption op,
852         in string            value) const
853     {
854         auto err = fdb_transaction_set_option(
855             cast(TransactionHandle)th,
856             op,
857             value.toStringz,
858             cast(int)value.length);
859         enforceError(err);
860     }
861 
862     shared(Value) opIndex(in Key key)
863     {
864         return get(key);
865     }
866 
867     RecordRange opIndex(RangeInfo info)
868     {
869         return getRange(info);
870     }
871 
872     inout(Value) opIndexAssign(inout(Value) value, in Key key)
873     {
874         set(key, value);
875         return value;
876     }
877 
878     void run(in WorkFunc func)
879     {
880         retryLoop(this, func);
881     };
882 
883     auto runAsync(in WorkFunc func, in VoidFutureCallback commitCallback)
884     {
885         auto future = createFuture!retryLoopAsync(this, func, commitCallback);
886         return future;
887     };
888 }
889 
890 void retryLoop(shared Transaction tr, in WorkFunc func)
891 {
892     while (true)
893     {
894         try
895         {
896             func(tr);
897             tr.commit;
898             return;
899         }
900         catch (Exception ex)
901         {
902             if (auto fdbex = cast(FDBException)ex)
903                 tr.onError(fdbex);
904             else
905             {
906                 // the error cannot be retried so we need to cancel the
907                 // transaction
908                 tr.cancel;
909                 throw ex;
910             }
911         }
912     }
913 }
914 
915 void retryLoopAsync(
916     shared Transaction    tr,
917     in WorkFunc           func,
918     in VoidFutureCallback cb)
919 {
920     while (true)
921     {
922         try
923         {
924             func(tr);
925             tr.commit;
926             cb(null);
927             return;
928         }
929         catch (Exception ex)
930         {
931             if (auto fdbex = cast(FDBException)ex)
932                 tr.onError(fdbex);
933             else
934             {
935                 // the error cannot be retried so we need to cancel the
936                 // transaction
937                 tr.cancel;
938                 cb(cast(Exception) ex);
939                 return;
940             }
941         }
942     }
943 }