1 module fdb.future;
2 
3 import
4     core.sync.semaphore,
5     core.thread;
6 
7 import
8     std.algorithm,
9     std.array,
10     std.conv,
11     std.exception,
12     std.parallelism,
13     std.traits;
14 
15 import
16     fdb.disposable,
17     fdb.error,
18     fdb.fdb_c,
19     fdb.range,
20     fdb.rangeinfo,
21     fdb.transaction;
22 
23 alias CompletionCallback = void delegate(Exception ex);
24 
25 private mixin template ExceptionCtorMixin()
26 {
27     this(string msg = null, Throwable next = null)
28     {
29         super(msg, next);
30     }
31 
32     this(string msg, string file, size_t line, Throwable next = null)
33     {
34         super(msg, file, line, next);
35     }
36 }
37 
38 class FutureException : Exception
39 {
40     mixin ExceptionCtorMixin;
41 }
42 
43 shared class FutureBase(V)
44 {
45     static if (!is(V == void))
46     {
47         protected V value;
48     }
49 
50     protected Exception exception;
51 
52     abstract shared(V) await();
53 }
54 
55 shared class FunctionFuture(alias fun, bool pool = true, Args...) :
56     FutureBase!(ReturnType!fun),
57 
58     // dummy implementation to allow storage in KeyValueFuture
59     IDisposable
60 {
61     alias V = ReturnType!fun;
62     alias T = Task!(fun, ParameterTypeTuple!fun) *;
63     private T t;
64 
65     this(Args args)
66     {
67         t = cast(shared)task!fun(args);
68         auto localTask = cast(T)t;
69         static if (pool)
70             taskPool.put(localTask);
71         else
72             localTask.executeInNewThread;
73     }
74 
75     void dispose() {}
76 
77     override shared(V) await()
78     {
79         try
80         {
81             auto localTask = cast(T)t;
82             static if (!is(V == void))
83                 value = localtask.yieldForce;
84             else
85                 localTask.yieldForce;
86         }
87         catch (Exception ex)
88         {
89             exception = cast(shared)ex;
90         }
91 
92         enforce(exception is null, cast(Exception)exception);
93         static if (!is(V == void))
94             return value;
95     }
96 }
97 
98 alias FutureCallback(V) = void delegate(Exception ex, V value);
99 
100 shared class FDBFutureBase(C, V) : FutureBase!V, IDisposable
101 {
102     private alias SF  = shared FDBFutureBase!(C, V);
103     private alias SFH = shared FutureHandle;
104     private alias SE  = shared fdb_error_t;
105 
106     private FutureHandle fh;
107     private Transaction  tr;
108     private C            callbackFunc;
109 
110     this(FutureHandle fh, shared Transaction tr)
111     {
112         this.fh = cast(shared)fh;
113         this.tr = tr;
114     }
115 
116     ~this()
117     {
118         dispose;
119     }
120 
121     void dispose()
122     {
123         if (!fh) return;
124 
125         // NB : Also releases the memory returned by get functions
126         fdb_future_destroy(cast(FutureHandle)fh);
127         fh = null;
128     }
129 
130     auto start(C callbackFunc)
131     {
132         this.callbackFunc = cast(shared)callbackFunc;
133         const auto err = fdb_future_set_callback(
134             cast(FutureHandle) fh,
135             cast(FDBCallback)  &futureReady,
136             cast(void*)        this);
137         enforceError(err);
138 
139         return this;
140     }
141 
142     shared(V) await(C callbackFunc)
143     {
144         if (callbackFunc)
145             start(callbackFunc);
146 
147         shared err = fdb_future_block_until_ready(cast(FutureHandle)fh);
148         if (err != FDBError.SUCCESS)
149         {
150             exception = cast(shared)err.toException;
151             enforce(exception is null, cast(Exception)exception);
152         }
153 
154         static if (!is(V == void))
155             value = cast(shared)extractValue(fh, err);
156         else
157             extractValue(fh, err);
158 
159         exception = cast(shared)err.toException;
160 
161         enforce(exception is null, cast(Exception)exception);
162         static if (!is(V == void))
163             return value;
164     }
165 
166     override shared(V) await()
167     {
168         static if (!is(V == void))
169             return await(null);
170         else
171             await(null);
172     }
173 
174     extern(C) static void futureReady(SFH f, SF thiz)
175     {
176         thread_attachThis;
177         auto futureTask = task!worker(f, thiz);
178         // or futureTask.executeInNewThread?
179         taskPool.put(futureTask);
180     }
181 
182     static void worker(SFH f, SF thiz)
183     {
184         shared fdb_error_t err;
185         with (thiz)
186         {
187             static if (is(V == void))
188             {
189                 extractValue(cast(shared)f, err);
190                 if (callbackFunc)
191                     (cast(C)callbackFunc)(err.toException);
192             }
193             else
194             {
195                 auto value = extractValue(cast(shared)f, err);
196                 if (callbackFunc)
197                     (cast(C)callbackFunc)(err.toException, value);
198             }
199         }
200     }
201 
202     abstract V extractValue(SFH fh, out SE err);
203 }
204 
205 private mixin template FDBFutureCtor()
206 {
207     this(FutureHandle fh, shared Transaction tr = null)
208     {
209         super(fh, tr);
210     }
211 }
212 
213 alias ValueFutureCallback = FutureCallback!Value;
214 
215 shared class ValueFuture : FDBFutureBase!(ValueFutureCallback, Value)
216 {
217     mixin FDBFutureCtor;
218 
219     private alias PValue = ubyte *;
220 
221     override Value extractValue(SFH fh, out SE err)
222     {
223         PValue value;
224         int    valueLength,
225                valuePresent;
226 
227         err = fdb_future_get_value(
228             cast(FutureHandle)fh,
229             &valuePresent,
230             &value,
231             &valueLength);
232         if (err != FDBError.SUCCESS || !valuePresent)
233             return null;
234         return value[0..valueLength];
235     }
236 }
237 
238 alias KeyFutureCallback = FutureCallback!Key;
239 
240 shared class KeyFuture : FDBFutureBase!(KeyFutureCallback, Key)
241 {
242     mixin FDBFutureCtor;
243 
244     private alias PKey = ubyte *;
245 
246     override Key extractValue(SFH fh, out SE err)
247     {
248         PKey key;
249         int  keyLength;
250 
251         err = fdb_future_get_key(
252             cast(FutureHandle)fh,
253             &key,
254             &keyLength);
255         if (err != FDBError.SUCCESS)
256             return typeof(return).init;
257         return key[0..keyLength];
258     }
259 }
260 
261 alias VoidFutureCallback = void delegate(Exception ex);
262 
263 shared class VoidFuture : FDBFutureBase!(VoidFutureCallback, void)
264 {
265     mixin FDBFutureCtor;
266 
267     override void extractValue(SFH fh, out SE err)
268     {
269         err = fdb_future_get_error(
270             cast(FutureHandle)fh);
271     }
272 }
273 
274 alias KeyValueFutureCallback   = FutureCallback!RecordRange;
275 alias ForEachCallback          = void delegate(Record record);
276 alias BreakableForEachCallback = void delegate(
277     Record   record,
278     out bool breakLoop);
279 
280 shared class KeyValueFuture
281     : FDBFutureBase!(KeyValueFutureCallback, RecordRange)
282 {
283     const RangeInfo info;
284 
285     private IDisposable[] futures;
286 
287     this(FutureHandle fh, shared Transaction tr, RangeInfo info)
288     {
289         super(fh, tr);
290 
291         this.info = cast(shared)info;
292     }
293 
294     override RecordRange extractValue(SFH fh, out SE err)
295     {
296         FDBKeyValue * kvs;
297         int len;
298         // Receives true if there are more result, or false if all results have
299         // been transmitted
300         fdb_bool_t more;
301         err = fdb_future_get_keyvalue_array(
302             cast(FutureHandle)fh,
303             &kvs,
304             &len,
305             &more);
306         if (err != FDBError.SUCCESS)
307             return typeof(return).init;
308 
309         auto records = minimallyInitializedArray!(Record[])(len);
310         foreach (i, kv; kvs[0..len])
311         {
312             records[i].key   = (cast(Key)  kv.key  [0..kv.key_length  ]).dup;
313             records[i].value = (cast(Value)kv.value[0..kv.value_length]).dup;
314         }
315 
316         return RecordRange(
317             records,
318             cast(bool)more,
319             cast(RangeInfo)info,
320             tr);
321     }
322 
323     auto forEach(FC)(FC fun, CompletionCallback cb)
324     {
325         auto future  = createFuture!(foreachTask!FC)(this, fun, cb);
326         synchronized (this)
327             futures ~= future;
328         return future;
329     }
330 
331     static void foreachTask(FC)(
332         shared KeyValueFuture future,
333         FC                    fun,
334         CompletionCallback    cb)
335     {
336         try
337         {
338             // This will block until value is ready
339             auto range = cast(RecordRange)future.await;
340             foreach (kv; range)
341             {
342                 static if (arity!fun == 2)
343                 {
344                     bool breakLoop;
345                     fun(kv, breakLoop);
346                     if (breakLoop) break;
347                 }
348                 else
349                     fun(kv);
350             }
351 
352             cb(null);
353         }
354         catch (Exception ex)
355         {
356             cb(ex);
357         }
358     }
359 }
360 
361 alias VersionFutureCallback = FutureCallback!ulong;
362 
363 shared class VersionFuture : FDBFutureBase!(VersionFutureCallback, ulong)
364 {
365     mixin FDBFutureCtor;
366 
367     override ulong extractValue(SFH fh, out SE err)
368     {
369         long ver;
370         err = fdb_future_get_version(
371             cast(FutureHandle)fh,
372             &ver);
373         if (err != FDBError.SUCCESS)
374             return typeof(return).init;
375         return ver;
376     }
377 }
378 
379 alias StringFutureCallback = FutureCallback!(string[]);
380 
381 shared class StringFuture : FDBFutureBase!(StringFutureCallback, string[])
382 {
383     mixin FDBFutureCtor;
384 
385     override string[] extractValue(SFH fh, out SE err)
386     {
387         char ** stringArr;
388         int     count;
389         err = fdb_future_get_string_array(
390             cast(FutureHandle)fh,
391             &stringArr,
392             &count);
393         if (err != FDBError.SUCCESS)
394             return typeof(return).init;
395         auto strings = stringArr[0..count].map!(to!string).array;
396         return strings;
397     }
398 }
399 
400 shared class WatchFuture : VoidFuture
401 {
402     mixin FDBFutureCtor;
403 
404     ~this()
405     {
406         cancel;
407     }
408 
409     void cancel()
410     {
411         if (fh)
412             fdb_future_cancel(cast(FutureHandle)fh);
413     }
414 }
415 
416 auto createFuture(F, Args...)(Args args)
417 {
418     auto future = new shared F(args);
419     return future;
420 }
421 
422 auto createFuture(alias fun, bool pool = true, Args...)(Args args)
423 if (isSomeFunction!fun)
424 {
425     auto future = new shared FunctionFuture!(fun, pool, Args)(args);
426     return future;
427 }
428 
429 auto startOrCreateFuture(F, C, Args...)(Args args, C callback)
430 {
431     auto future = createFuture!F(args);
432     if (callback)
433         future.start(callback);
434     return future;
435 }
436 
437 void await(F...)(F futures)
438 {
439     foreach (f; futures)
440         f.await;
441 }