1 module fdb.transaction;
2 
3 import
4     std.array,
5     std.conv,
6     std.exception;
7 
8 import
9     fdb.database,
10     fdb.direct,
11     fdb.disposable,
12     fdb.error,
13     fdb.fdb_c,
14     fdb.fdb_c_options,
15     fdb.future,
16     fdb.range,
17     fdb.rangeinfo;
18 
19 shared interface IReadOnlyTransaction
20 {
21     @property bool isSnapshot();
22 
23     shared(KeyFuture) getKey(
24         const Selector    selector,
25         KeyFutureCallback callback = null);
26 
27     shared(ValueFuture) get(
28         const Key           key,
29         ValueFutureCallback callback = null);
30 
31     /**
32      * Returns: Key-value pairs within (begin, end) range
33      */
34     shared(KeyValueFuture) getRange(
35         RangeInfo              info,
36         KeyValueFutureCallback callback = null);
37 
38     void addReadConflictRange(RangeInfo info);
39     void addWriteConflictRange(RangeInfo info);
40 
41     shared(VoidFuture) onError(
42         const FDBException ex,
43         VoidFutureCallback callback = null);
44 
45     shared(VersionFuture) getReadVersion(VersionFutureCallback callback = null);
46 
47     long getCommittedVersion();
48 
49     shared(StringFuture) getAddressesForKey(
50         const Key            key,
51         StringFutureCallback callback = null);
52 
53     shared(Value) opIndex(const Key key);
54 
55     RecordRange opIndex(RangeInfo info);
56 }
57 
58 alias WorkFunc = void delegate(shared Transaction tr, VoidFutureCallback cb);
59 
60 shared class Transaction : IDirect, IDisposable, IReadOnlyTransaction
61 {
62     private const Database    db;
63     private TransactionHandle th;
64     private const bool _isSnapshot;
65 
66     @property bool isSnapshot()
67     {
68         return _isSnapshot;
69     }
70 
71     private IDisposable[] futures;
72 
73     invariant()
74     {
75         assert(db !is null);
76     }
77 
78     this(
79         TransactionHandle     th,
80         const shared Database db)
81     in
82     {
83         enforce(db !is null, "db must be set");
84         enforce(th !is null, "th must be set");
85     }
86     body
87     {
88         this.th          = cast(shared)th;
89         this.db          = db;
90         this._isSnapshot = false;
91     }
92 
93     invariant()
94     {
95         assert(db !is null);
96     }
97 
98     private this(
99         shared TransactionHandle th,
100         const shared Database    db,
101         const bool               isSnapshot)
102     in
103     {
104         enforce(db !is null, "db must be set");
105         enforce(th !is null, "th must be set");
106     }
107     body
108     {
109         this.th          = cast(shared)th;
110         this.db          = db;
111         this._isSnapshot = isSnapshot;
112     }
113 
114     @property shared(IReadOnlyTransaction) snapshot()
115     {
116         auto snapshot = new shared Transaction(th, db, true);
117         return cast(shared IReadOnlyTransaction)snapshot;
118     }
119 
120     ~this()
121     {
122         dispose;
123     }
124 
125     void dispose()
126     {
127         // parent transaction should handle destruction
128         if (!th || isSnapshot) return;
129 
130         fdb_transaction_destroy(cast(TransactionHandle)th);
131         th = null;
132     }
133 
134     void set(const Key key, const Value value) const
135     in
136     {
137         enforce(key !is null);
138         enforce(!key.empty);
139         enforce(value !is null);
140         enforce(!value.empty);
141     }
142     body
143     {
144         fdb_transaction_set(
145             cast(TransactionHandle)th,
146             &key[0],
147             cast(int)key.length,
148             &value[0],
149             cast(int)value.length);
150     }
151 
152     auto commit(VoidFutureCallback callback = null)
153     {
154         // cancel, commit and reset are mutually exclusive
155         synchronized (this)
156         {
157             auto fh      = fdb_transaction_commit(cast(TransactionHandle)th);
158             auto future  = startOrCreateFuture!VoidFuture(fh, this, callback);
159             futures     ~= future;
160             return future;
161         }
162     }
163 
164     void cancel()
165     {
166         // cancel, commit and reset are mutually exclusive
167         synchronized (this)
168             fdb_transaction_cancel(cast(TransactionHandle)th);
169     }
170 
171     /**
172      * Resets transaction to its initial state
173      */
174     void reset()
175     {
176         // cancel, commit and reset are mutually exclusive
177         synchronized (this)
178             fdb_transaction_reset(cast(TransactionHandle)th);
179     }
180 
181     void clear(const Key key) const
182     in
183     {
184         enforce(key !is null);
185         enforce(!key.empty);
186     }
187     body
188     {
189         fdb_transaction_clear(
190             cast(TransactionHandle)th,
191             &key[0],
192             cast(int)key.length);
193     }
194 
195     void clearRange(const RangeInfo info) const
196     in
197     {
198         enforce(!info.begin.key.empty);
199         enforce(!info.end.key.empty);
200     }
201     body
202     {
203         fdb_transaction_clear_range(
204             cast(TransactionHandle)th,
205             &info.begin.key[0],
206             cast(int)info.begin.key.length,
207             &info.end.key[0],
208             cast(int)info.end.key.length);
209     }
210 
211     shared(KeyFuture) getKey(
212         const Selector    selector,
213         KeyFutureCallback callback = null)
214     {
215         auto fh = fdb_transaction_get_key(
216             cast(TransactionHandle)th,
217             &selector.key[0],
218             cast(int)selector.key.length,
219             cast(fdb_bool_t)selector.orEqual,
220             selector.offset,
221             cast(fdb_bool_t)_isSnapshot);
222 
223         auto future = startOrCreateFuture!KeyFuture(fh, this, callback);
224         synchronized (this)
225             futures ~= future;
226         return future;
227     }
228 
229     shared(ValueFuture) get(
230         const Key           key,
231         ValueFutureCallback callback = null)
232     in
233     {
234         enforce(key !is null);
235         enforce(!key.empty);
236     }
237     body
238     {
239         auto fh = fdb_transaction_get(
240             cast(TransactionHandle)th,
241             &key[0],
242             cast(int)key.length,
243             cast(fdb_bool_t)_isSnapshot);
244 
245         auto future = startOrCreateFuture!ValueFuture(fh, this, callback);
246         synchronized (this)
247             futures ~= future;
248         return future;
249     }
250 
251     /**
252      * Returns: Key-value pairs within (begin, end) range
253      */
254     shared(KeyValueFuture) getRange(
255         RangeInfo              info,
256         KeyValueFutureCallback callback = null)
257     {
258         auto begin = sanitizeKey(info.begin.key, [ 0x00 ]);
259         auto end   = sanitizeKey(info.end.key, [ 0xff ]);
260 
261         auto fh = fdb_transaction_get_range(
262             cast(TransactionHandle)cast(TransactionHandle)th,
263 
264             &begin[0],
265             cast(int)begin.length,
266             cast(fdb_bool_t)info.begin.orEqual,
267             info.begin.offset,
268 
269             &end[0],
270             cast(int)end.length,
271             cast(fdb_bool_t)info.end.orEqual,
272             info.end.offset,
273 
274             info.limit,
275             0,
276             info.mode,
277             info.iteration,
278             cast(fdb_bool_t)_isSnapshot,
279             info.reverse);
280 
281         auto future = startOrCreateFuture!KeyValueFuture(
282             fh, this, info, callback);
283         synchronized (this)
284             futures ~= future;
285         return future;
286     }
287 
288     auto watch(const Key key, VoidFutureCallback callback = null)
289     in
290     {
291         enforce(key !is null);
292         enforce(!key.empty);
293     }
294     body
295     {
296         auto fh = fdb_transaction_watch(
297             cast(TransactionHandle)th,
298             &key[0],
299             cast(int)key.length);
300         auto future = startOrCreateFuture!WatchFuture(fh, this, callback);
301         synchronized (this)
302             futures ~= future;
303         return future;
304     }
305 
306     private void addConflictRange(
307         RangeInfo               info,
308         const ConflictRangeType type) const
309     in
310     {
311         enforce(!info.begin.key.empty);
312         enforce(!info.end.key.empty);
313     }
314     body
315     {
316         auto err = fdb_transaction_add_conflict_range(
317             cast(TransactionHandle)th,
318             &info.begin.key[0],
319             cast(int)info.begin.key.length,
320             &info.end.key[0],
321             cast(int)info.end.key.length,
322             type);
323         enforceError(err);
324     }
325 
326     void addReadConflictRange(RangeInfo info) const
327     {
328         addConflictRange(info, ConflictRangeType.READ);
329     }
330 
331     void addWriteConflictRange(RangeInfo info) const
332     {
333         addConflictRange(info, ConflictRangeType.WRITE);
334     }
335 
336     shared(VoidFuture) onError(
337         const FDBException ex,
338         VoidFutureCallback callback = null)
339     {
340         auto fh = fdb_transaction_on_error(
341             cast(TransactionHandle)th,
342             ex.err);
343         auto future = startOrCreateFuture!VoidFuture(fh, this, callback);
344         synchronized (this)
345             futures ~= future;
346         return future;
347     }
348 
349     void setReadVersion(const int ver) const
350     in
351     {
352         enforce(ver > 0);
353     }
354     body
355     {
356         fdb_transaction_set_read_version(
357             cast(TransactionHandle)th,
358             ver);
359     }
360 
361     shared(VersionFuture) getReadVersion(VersionFutureCallback callback = null)
362     {
363         auto fh = fdb_transaction_get_read_version(
364             cast(TransactionHandle)th);
365         auto future = startOrCreateFuture!VersionFuture(fh, this, callback);
366         synchronized (this)
367             futures ~= future;
368         return future;
369     }
370 
371     long getCommittedVersion() const
372     {
373         long ver;
374         auto err = fdb_transaction_get_committed_version(
375             cast(TransactionHandle)th,
376             &ver);
377         enforceError(err);
378         return ver;
379     }
380 
381     shared(StringFuture) getAddressesForKey(
382         const Key            key,
383         StringFutureCallback callback = null)
384     in
385     {
386         enforce(key !is null);
387         enforce(!key.empty);
388     }
389     body
390     {
391         auto fh = fdb_transaction_get_addresses_for_key(
392             cast(TransactionHandle)th,
393             &key[0],
394             cast(int)key.length);
395 
396         auto future = startOrCreateFuture!StringFuture(fh, this, callback);
397         synchronized (this)
398             futures ~= future;
399         return future;
400     }
401 
402     /**
403      * Performs an addition of little-endian integers. If the existing value
404      * in the database is not present or shorter than ``param``, it is first
405      * extended to the length of ``param`` with zero bytes.  If ``param`` is
406      * shorter than the existing value in the database, the existing value is
407      * truncated to match the length of ``param``. The integers to be added
408      * must be stored in a little-endian representation.  They can be signed
409      * in two's complement representation or unsigned. You can add to an integer
410      * at a known offset in the value by prepending the appropriate number of
411      * zero bytes to ``param`` and padding with zero bytes to match the length
412      * of the value. However, this offset technique requires that you know the
413      * addition will not cause the integer field within the value to overflow.
414      */
415     void add(const Key key, const Value value) const
416     {
417         callAtomicOperation(key, value, MutationType.ADD);
418     }
419 
420     // Deprecated
421     // ADD_MUTATION_TYPE("and", 6);
422 
423     /**
424      * Performs a bitwise ``and`` operation.  If the existing value in the
425      * database is not present or shorter than ``param``, it is first extended
426      * to the length of ``param`` with zero bytes.  If ``param`` is shorter than
427      * the existing value in the database, the existing value is truncated to
428      * match the length of ``param``.
429      */
430     void bitAnd(const Key key, const Value value) const
431     {
432         callAtomicOperation(key, value, MutationType.BIT_AND);
433     }
434 
435     // Deprecated
436     //ADD_MUTATION_TYPE("or", 7);
437 
438     /**
439      * Performs a bitwise ``or`` operation.  If the existing value in the
440      * database is not present or shorter than ``param``, it is first extended
441      * to the length of ``param`` with zero bytes.  If ``param`` is shorter than
442      * the existing value in the database, the existing value is truncated to
443      * match the length of ``param``.
444      */
445     void bitOr(const Key key, const Value value) const
446     {
447         callAtomicOperation(key, value, MutationType.BIT_OR);
448     }
449 
450     // Deprecated
451     // ADD_MUTATION_TYPE("xor", 8);
452 
453     /**
454      * Performs a bitwise ``xor`` operation.  If the existing value in the
455      * database is not present or shorter than ``param``, it is first extended
456      * to the length of ``param`` with zero bytes.  If ``param`` is shorter than
457      * the existing value in the database, the existing value is truncated to
458      * match the length of ``param``.
459      */
460     void bitXor(const Key key, const Value value) const
461     {
462         callAtomicOperation(key, value, MutationType.BIT_XOR);
463     }
464 
465     private void callAtomicOperation(
466         const Key          key,
467         const Value        value,
468         const MutationType type) const
469     in
470     {
471         enforce(key !is null);
472         enforce(!key.empty);
473         enforce(value !is null);
474         enforce(!value.empty);
475     }
476     body
477     {
478         fdb_transaction_atomic_op(
479             cast(TransactionHandle)th,
480             &key[0],
481             cast(int)key.length,
482             &value[0],
483             cast(int)value.length,
484             type);
485     }
486 
487     /**
488      * The transaction, if not self-conflicting, may be committed a second time
489      * after commit succeeds, in the event of a fault
490      * Parameter: Option takes no parameter
491      */
492     void setCausalWriteRisky() const
493     {
494         setTransactionOption(TransactionOption.CAUSAL_WRITE_RISKY);
495     }
496 
497     /**
498      * The read version will be committed, and usually will be the latest
499      * committed, but might not be the latest committed in the event of a fault
500      * or partition
501      * Parameter: Option takes no parameter
502      */
503     void setCausalReadRisky() const
504     {
505         setTransactionOption(TransactionOption.CAUSAL_READ_RISKY);
506     }
507 
508     // Parameter: Option takes no parameter
509     void setCausalReadDisable() const
510     {
511         setTransactionOption(TransactionOption.CAUSAL_READ_DISABLE);
512     }
513 
514     /**
515      * The next write performed on this transaction will not generate a write
516      * conflict range. As a result, other transactions which read the key(s)
517      * being modified by the next write will not conflict with this transaction.
518      * Care needs to be taken when using this option on a transaction that is
519      * shared between multiple threads. When setting this option, write conflict
520      * ranges will be disabled on the next write operation, regardless of what
521      * thread it is on.
522      * Parameter: Option takes no parameter
523      */
524     void setNextWriteNoWriteConflictRange() const
525     {
526         setTransactionOption(
527             TransactionOption.NEXT_WRITE_NO_WRITE_CONFLICT_RANGE);
528     }
529 
530     // Parameter: Option takes no parameter
531     void setCheckWritesEnable() const
532     {
533         setTransactionOption(TransactionOption.CHECK_WRITES_ENABLE);
534     }
535 
536     /**
537      * Reads performed by a transaction will not see any prior mutations that
538      * occured in that transaction, instead seeing the value which was in the
539      * database at the transaction's read version. This option may provide a
540      * small performance benefit for the client, but also disables a number of
541      * client-side optimizations which are beneficial for transactions which
542      * tend to read and write the same keys within a single transaction.
543      * Parameter: Option takes no parameter
544      */
545     void setReadYourWritesDisable() const
546     {
547         setTransactionOption(TransactionOption.READ_YOUR_WRITES_DISABLE);
548     }
549 
550     /**
551      * Disables read-ahead caching for range reads. Under normal operation, a
552      * transaction will read extra rows from the database into cache if range
553      * reads are used to page through a series of data one row at a time (i.e.
554      * if a range read with a one row limit is followed by another one row range
555      * read starting immediately after the result of the first).
556      * Parameter: Option takes no parameter
557      */
558     void setReadAheadDisable() const
559     {
560         setTransactionOption(TransactionOption.READ_AHEAD_DISABLE);
561     }
562 
563     // Parameter: Option takes no parameter
564     void setDurabilityDatacenter() const
565     {
566         setTransactionOption(TransactionOption.DURABILITY_DATACENTER);
567     }
568 
569     // Parameter: Option takes no parameter
570     void setDurabilityRisky() const
571     {
572         setTransactionOption(TransactionOption.DURABILITY_RISKY);
573     }
574 
575     // Parameter: Option takes no parameter
576     void setDurabilityDevNullIsWebScale() const
577     {
578         setTransactionOption(
579             TransactionOption.DURABILITY_DEV_NULL_IS_WEB_SCALE);
580     }
581 
582     /**
583      * Specifies that this transaction should be treated as highest priority and
584      * that lower priority transactions should block behind this one. Use is
585      * discouraged outside of low-level tools
586      * Parameter: Option takes no parameter
587      */
588     void setPrioritySystemImmediate() const
589     {
590         setTransactionOption(TransactionOption.PRIORITY_SYSTEM_IMMEDIATE);
591     }
592 
593     /**
594      * Specifies that this transaction should be treated as low priority and
595      * that default priority transactions should be processed first. Useful for
596      * doing batch work simultaneously with latency-sensitive work
597      * Parameter: Option takes no parameter
598      */
599     void setPriorityBatch() const
600     {
601         setTransactionOption(TransactionOption.PRIORITY_BATCH);
602     }
603 
604     /**
605      * This is a write-only transaction which sets the initial configuration
606      * Parameter: Option takes no parameter
607      */
608     void setInitializeNewDatabase() const
609     {
610         setTransactionOption(TransactionOption.INITIALIZE_NEW_DATABASE);
611     }
612 
613     /**
614      * Allows this transaction to read and modify system keys (those that start
615      * with the byte 0xFF)
616      * Parameter: Option takes no parameter
617      */
618     void setAccessSystemKeys() const
619     {
620         setTransactionOption(TransactionOption.ACCESS_SYSTEM_KEYS);
621     }
622 
623     // Parameter: Option takes no parameter
624     void setDebugDump() const
625     {
626         setTransactionOption(TransactionOption.DEBUG_DUMP);
627     }
628 
629     /**
630      * Set a timeout in milliseconds which, when elapsed, will cause the
631      * transaction automatically to be cancelled. Valid parameter values are
632      * ``[0, INT_MAX]``. If set to 0, will disable all timeouts. All pending and
633      * any future uses of the transaction will throw an exception. The
634      * transaction can be used again after it is reset.
635      * Parameter: (Int) value in milliseconds of timeout
636      */
637     void setTimeout(const long value) const
638     {
639         setTransactionOption(TransactionOption.TIMEOUT, value);
640     }
641 
642     /**
643      * Set a maximum number of retries after which additional calls to onError
644      * will throw the most recently seen error code. Valid parameter values are
645      * ``[-1, INT_MAX]``. If set to -1, will disable the retry limit.
646      * Parameter: (Int) number of times to retry
647      */
648     void setRetryLimit(const long value) const
649     {
650         setTransactionOption(TransactionOption.RETRY_LIMIT, value);
651     }
652 
653     private void setTransactionOption(const TransactionOption op) const
654     {
655         auto err = fdb_transaction_set_option(
656             cast(TransactionHandle)th,
657             op,
658             null,
659             0);
660         enforceError(err);
661     }
662 
663     private void setTransactionOption(
664         const TransactionOption op,
665         const long              value) const
666     {
667         auto err = fdb_transaction_set_option(
668             cast(TransactionHandle)th,
669             op,
670             cast(immutable(char)*)&value,
671             cast(int)value.sizeof);
672         enforceError(err);
673     }
674 
675     shared(Value) opIndex(const Key key)
676     {
677         auto f     = get(key);
678         auto value = f.await;
679         return value;
680     }
681 
682     RecordRange opIndex(RangeInfo info)
683     {
684         auto f     = getRange(info);
685         auto value = cast(RecordRange)f.await;
686         return value;
687     }
688 
689     inout(Value) opIndexAssign(inout(Value) value, const Key key)
690     {
691         set(key, value);
692         return value;
693     }
694 
695     void run(SimpleWorkFunc func)
696     {
697         WorkFunc wf = (tr, cb)
698         {
699             func(tr);
700             cb(null);
701         };
702 
703         VoidFutureCallback cb = (ex)
704         {
705             enforce(ex is null, ex);
706         };
707 
708         auto future = createFuture!retryLoop(this, wf, cb);
709         future.await;
710     };
711 
712     auto doTransaction(
713         WorkFunc           func,
714         VoidFutureCallback commitCallback)
715     {
716         auto future = createFuture!retryLoop(this, func, commitCallback);
717         return future;
718     };
719 }
720 
721 void retryLoop(
722     shared Transaction tr,
723     WorkFunc           func,
724     VoidFutureCallback cb)
725 {
726     try
727     {
728         func(tr, (ex)
729         {
730             if (ex)
731                 onError(tr, ex, func, cb);
732             else
733             {
734                 auto future = tr.commit((commitErr)
735                 {
736                     if (commitErr)
737                         onError(tr, commitErr, func, cb);
738                     else
739                         cb(commitErr);
740                 });
741                 future.await;
742             }
743         });
744     }
745     catch (Exception ex)
746     {
747         onError(tr, ex, func, cb);
748     }
749 }
750 
751 private void onError(
752     shared Transaction tr,
753     Exception          ex,
754     WorkFunc           func,
755     VoidFutureCallback cb)
756 {
757     if (auto fdbex = cast(FDBException)ex)
758     {
759         tr.onError(fdbex, (retryErr)
760         {
761             if (retryErr)
762                 cb(retryErr);
763             else
764                 retryLoop(tr, func, cb);
765         });
766     }
767     else
768     {
769         tr.cancel();
770         cb(ex);
771     }
772 };