1 module fdb.future;
2 
3 import
4     core.sync.semaphore,
5     core.thread;
6 
7 import
8     std.algorithm,
9     std.array,
10     std.conv,
11     std.exception,
12     std.parallelism,
13     std.traits;
14 
15 import
16     fdb.disposable,
17     fdb.error,
18     fdb.fdb_c,
19     fdb.range,
20     fdb.rangeinfo,
21     fdb.transaction;
22 
23 alias CompletionCallback = void delegate(Exception ex);
24 
25 shared class FutureBase(V)
26 {
27     static if (!is(V == void))
28     {
29         protected V value;
30     }
31 
32     protected Exception exception;
33 
34     abstract shared(V) await();
35 }
36 
37 shared class FunctionFuture(alias fun, Args...) :
38     FutureBase!(ReturnType!fun),
39 
40     // dummy implementation to allow storage in KeyValueFuture
41     IDisposable
42 {
43     alias V = ReturnType!fun;
44     alias T = Task!(fun, ParameterTypeTuple!fun) *;
45     private T t;
46 
47     this(Args args)
48     {
49         t = cast(shared)task!fun(args);
50         auto localTask = cast(T)t;
51         //localTask.executeInNewThread;
52         taskPool.put(localTask);
53     }
54 
55     void dispose() {}
56 
57     override shared(V) await()
58     {
59         try
60         {
61             auto localTask = cast(T)t;
62             static if (!is(V == void))
63                 value = localtask.yieldForce;
64             else
65                 localTask.yieldForce;
66         }
67         catch (Exception ex)
68         {
69             exception = cast(shared)ex;
70         }
71 
72         enforce(exception is null, cast(Exception)exception);
73         static if (!is(V == void))
74             return value;
75     }
76 }
77 
78 class BasicFuture(V)
79 {
80     private Exception   exception;
81     private Semaphore   event;
82 
83     static if (!is(V == void))
84         private V       value;
85 
86     this()
87     {
88         event = new Semaphore;
89     }
90 
91     static if (!is(V == void))
92         void notify(Exception ex, ref V val)
93         {
94             exception   = ex;
95             value       = val;
96             event.notify;
97         }
98     else
99         void notify(Exception ex)
100         {
101             exception   = ex;
102             event.notify;
103         }
104 
105     V await()
106     {
107         event.wait;
108         enforce(exception is null, cast(Exception)exception);
109         static if (!is(V == void))
110             return value;
111     }
112 }
113 
114 alias FutureCallback(V) = void delegate(Exception ex, V value);
115 
116 shared class FDBFutureBase(C, V) : FutureBase!V, IDisposable
117 {
118     private alias SF    = shared FDBFutureBase!(C, V);
119     private alias SFH   = shared FutureHandle;
120     private alias SE    = shared fdb_error_t;
121 
122     private FutureHandle fh;
123     private Transaction  tr;
124     private C            callbackFunc;
125 
126     this(FutureHandle fh, shared Transaction tr)
127     {
128         this.fh = cast(shared)fh;
129         this.tr = tr;
130     }
131 
132     ~this()
133     {
134         dispose;
135     }
136 
137     void dispose()
138     {
139         if (!fh) return;
140 
141         // NB : Also releases the memory returned by get functions
142         fdb_future_destroy(cast(FutureHandle)fh);
143         fh = null;
144     }
145 
146     auto start(C callbackFunc)
147     {
148         this.callbackFunc = cast(shared)callbackFunc;
149         const auto err = fdb_future_set_callback(
150             cast(FutureHandle) fh,
151             cast(FDBCallback)  &futureReady,
152             cast(void*)        this);
153         enforceError(err);
154 
155         return this;
156     }
157 
158     shared(V) await(C callbackFunc)
159     {
160         if (callbackFunc)
161             start(callbackFunc);
162 
163         shared err = fdb_future_block_until_ready(cast(FutureHandle)fh);
164         if (err != FDBError.SUCCESS)
165         {
166             exception = cast(shared)err.toException;
167             enforce(exception is null, cast(Exception)exception);
168         }
169 
170         static if (!is(V == void))
171             value  = cast(shared)extractValue(fh, err);
172 
173         exception  = cast(shared)err.toException;
174 
175         enforce(exception is null, cast(Exception)exception);
176         static if (!is(V == void))
177             return value;
178     }
179 
180     override shared(V) await()
181     {
182         static if (!is(V == void))
183             return await(null);
184         else
185             await(null);
186     }
187 
188     extern(C) static void futureReady(SFH f, SF thiz)
189     {
190         thread_attachThis;
191         auto futureTask = task!worker(f, thiz);
192         // or futureTask.executeInNewThread?
193         taskPool.put(futureTask);
194     }
195 
196     static void worker(SFH f, SF thiz)
197     {
198         shared fdb_error_t err;
199         with (thiz)
200         {
201             static if (is(V == void))
202             {
203                 extractValue(cast(shared)f, err);
204                 if (callbackFunc)
205                     (cast(C)callbackFunc)(err.toException);
206             }
207             else
208             {
209                 auto value = extractValue(cast(shared)f, err);
210                 if (callbackFunc)
211                     (cast(C)callbackFunc)(err.toException, value);
212             }
213         }
214     }
215 
216     abstract V extractValue(SFH fh, out SE err);
217 }
218 
219 private mixin template FDBFutureCtor()
220 {
221     this(FutureHandle fh, shared Transaction tr = null)
222     {
223         super(fh, tr);
224     }
225 }
226 
227 alias ValueFutureCallback = FutureCallback!Value;
228 
229 shared class ValueFuture : FDBFutureBase!(ValueFutureCallback, Value)
230 {
231     mixin FDBFutureCtor;
232 
233     private alias PValue = ubyte *;
234 
235     override Value extractValue(SFH fh, out SE err)
236     {
237         PValue value;
238         int    valueLength,
239                valuePresent;
240 
241         err = fdb_future_get_value(
242             cast(FutureHandle)fh,
243             &valuePresent,
244             &value,
245             &valueLength);
246         if (err != FDBError.SUCCESS || !valuePresent)
247             return null;
248         return value[0..valueLength];
249     }
250 }
251 
252 alias KeyFutureCallback = FutureCallback!Key;
253 
254 shared class KeyFuture : FDBFutureBase!(KeyFutureCallback, Key)
255 {
256     mixin FDBFutureCtor;
257 
258     private alias PKey = ubyte *;
259 
260     override Key extractValue(SFH fh, out SE err)
261     {
262         PKey key;
263         int  keyLength;
264 
265         err = fdb_future_get_key(
266             cast(FutureHandle)fh,
267             &key,
268             &keyLength);
269         if (err != FDBError.SUCCESS)
270             return typeof(return).init;
271         return key[0..keyLength];
272     }
273 }
274 
275 alias VoidFutureCallback = void delegate(Exception ex);
276 
277 shared class VoidFuture : FDBFutureBase!(VoidFutureCallback, void)
278 {
279     mixin FDBFutureCtor;
280 
281     override void extractValue(SFH fh, out SE err)
282     {
283         err = fdb_future_get_error(
284             cast(FutureHandle)fh);
285     }
286 }
287 
288 alias KeyValueFutureCallback    = FutureCallback!RecordRange;
289 alias ForEachCallback           = void delegate(Record record);
290 alias BreakableForEachCallback  = void delegate(
291     Record record,
292     out bool breakLoop);
293 
294 shared class KeyValueFuture
295     : FDBFutureBase!(KeyValueFutureCallback, RecordRange)
296 {
297     const RangeInfo info;
298 
299     private IDisposable[] futures;
300 
301     this(FutureHandle fh, shared Transaction tr, RangeInfo info)
302     {
303         super(fh, tr);
304 
305         this.info = cast(shared)info;
306     }
307 
308     override RecordRange extractValue(SFH fh, out SE err)
309     {
310         FDBKeyValue * kvs;
311         int len;
312         // Receives true if there are more result, or false if all results have
313         // been transmitted
314         fdb_bool_t more;
315         err = fdb_future_get_keyvalue_array(
316             cast(FutureHandle)fh,
317             &kvs,
318             &len,
319             &more);
320         if (err != FDBError.SUCCESS)
321             return typeof(return).init;
322 
323         auto records = minimallyInitializedArray!(Record[])(len);
324         foreach (i, kv; kvs[0..len])
325         {
326             records[i].key   = (cast(Key)  kv.key  [0..kv.key_length  ]).dup;
327             records[i].value = (cast(Value)kv.value[0..kv.value_length]).dup;
328         }
329 
330         return RecordRange(
331             records,
332             cast(bool)more,
333             cast(RangeInfo)info,
334             tr);
335     }
336 
337     auto forEach(FC)(FC fun, CompletionCallback cb)
338     {
339         auto future  = createFuture!(foreachTask!FC)(this, fun, cb);
340         synchronized (this)
341             futures ~= future;
342         return future;
343     }
344 
345     static void foreachTask(FC)(
346         shared KeyValueFuture future,
347         FC                    fun,
348         CompletionCallback    cb)
349     {
350         try
351         {
352             // This will block until value is ready
353             auto range = cast(RecordRange)future.await;
354             foreach (kv; range)
355             {
356                 static if (arity!fun == 2)
357                 {
358                     bool breakLoop;
359                     fun(kv, breakLoop);
360                     if (breakLoop) break;
361                 }
362                 else
363                     fun(kv);
364             }
365 
366             cb(null);
367         }
368         catch (Exception ex)
369         {
370             cb(ex);
371         }
372     }
373 }
374 
375 alias VersionFutureCallback = FutureCallback!ulong;
376 
377 shared class VersionFuture : FDBFutureBase!(VersionFutureCallback, ulong)
378 {
379     mixin FDBFutureCtor;
380 
381     override ulong extractValue(SFH fh, out SE err)
382     {
383         long ver;
384         err = fdb_future_get_version(
385             cast(FutureHandle)fh,
386             &ver);
387         if (err != FDBError.SUCCESS)
388             return typeof(return).init;
389         return ver;
390     }
391 }
392 
393 alias StringFutureCallback = FutureCallback!(string[]);
394 
395 shared class StringFuture : FDBFutureBase!(StringFutureCallback, string[])
396 {
397     mixin FDBFutureCtor;
398 
399     override string[] extractValue(SFH fh, out SE err)
400     {
401         char ** stringArr;
402         int     count;
403         err = fdb_future_get_string_array(
404             cast(FutureHandle)fh,
405             &stringArr,
406             &count);
407         if (err != FDBError.SUCCESS)
408             return typeof(return).init;
409         auto strings = stringArr[0..count].map!(to!string).array;
410         return strings;
411     }
412 }
413 
414 shared class WatchFuture : VoidFuture
415 {
416     mixin FDBFutureCtor;
417 
418     ~this()
419     {
420         cancel;
421     }
422 
423     void cancel()
424     {
425         if (fh)
426             fdb_future_cancel(cast(FutureHandle)fh);
427     }
428 }
429 
430 auto createFuture(T)()
431 {
432     auto future = new BasicFuture!T;
433     return future;
434 }
435 
436 auto createFuture(F, Args...)(Args args)
437 {
438     auto future = new shared F(args);
439     return future;
440 }
441 
442 auto createFuture(alias fun, Args...)(Args args)
443 if (isSomeFunction!fun)
444 {
445     auto future = new shared FunctionFuture!(fun, Args)(args);
446     return future;
447 }
448 
449 auto startOrCreateFuture(F, C, Args...)(
450     Args args,
451     C callback)
452 {
453     auto future = createFuture!F(args);
454     if (callback)
455         future.start(callback);
456     return future;
457 }
458 
459 void await(F...)(F futures)
460 {
461     foreach (f; futures)
462         f.await;
463 }