1 module fdb.future; 2 3 import 4 core.sync.semaphore, 5 core.thread; 6 7 import 8 std.algorithm, 9 std.array, 10 std.conv, 11 std.exception, 12 std.parallelism, 13 std.traits; 14 15 import 16 fdb.disposable, 17 fdb.error, 18 fdb.fdb_c, 19 fdb.range, 20 fdb.rangeinfo, 21 fdb.transaction; 22 23 alias CompletionCallback = void delegate(Exception ex); 24 25 shared class FutureBase(V) 26 { 27 static if (!is(V == void)) 28 { 29 protected V value; 30 } 31 32 protected Exception exception; 33 34 abstract shared(V) await(); 35 } 36 37 shared class FunctionFuture(alias fun, Args...) : 38 FutureBase!(ReturnType!fun), 39 40 // dummy implementation to allow storage in KeyValueFuture 41 IDisposable 42 { 43 alias V = ReturnType!fun; 44 alias T = Task!(fun, ParameterTypeTuple!fun) *; 45 private T t; 46 47 this(Args args) 48 { 49 t = cast(shared)task!fun(args); 50 auto localTask = cast(T)t; 51 //localTask.executeInNewThread; 52 taskPool.put(localTask); 53 } 54 55 void dispose() {} 56 57 override shared(V) await() 58 { 59 try 60 { 61 auto localTask = cast(T)t; 62 static if (!is(V == void)) 63 value = localtask.yieldForce; 64 else 65 localTask.yieldForce; 66 } 67 catch (Exception ex) 68 { 69 exception = cast(shared)ex; 70 } 71 72 enforce(exception is null, cast(Exception)exception); 73 static if (!is(V == void)) 74 return value; 75 } 76 } 77 78 class BasicFuture(V) 79 { 80 private Exception exception; 81 private Semaphore event; 82 83 static if (!is(V == void)) 84 private V value; 85 86 this() 87 { 88 event = new Semaphore; 89 } 90 91 static if (!is(V == void)) 92 void notify(Exception ex, ref V val) 93 { 94 exception = ex; 95 value = val; 96 event.notify; 97 } 98 else 99 void notify(Exception ex) 100 { 101 exception = ex; 102 event.notify; 103 } 104 105 V await() 106 { 107 event.wait; 108 enforce(exception is null, cast(Exception)exception); 109 static if (!is(V == void)) 110 return value; 111 } 112 } 113 114 alias FutureCallback(V) = void delegate(Exception ex, V value); 115 116 shared class FDBFutureBase(C, V) : FutureBase!V, IDisposable 117 { 118 private alias SF = shared FDBFutureBase!(C, V); 119 private alias SFH = shared FutureHandle; 120 private alias SE = shared fdb_error_t; 121 122 private FutureHandle fh; 123 private Transaction tr; 124 private C callbackFunc; 125 126 this(FutureHandle fh, shared Transaction tr) 127 { 128 this.fh = cast(shared)fh; 129 this.tr = tr; 130 } 131 132 ~this() 133 { 134 dispose; 135 } 136 137 void dispose() 138 { 139 if (!fh) return; 140 141 // NB : Also releases the memory returned by get functions 142 fdb_future_destroy(cast(FutureHandle)fh); 143 fh = null; 144 } 145 146 auto start(C callbackFunc) 147 { 148 this.callbackFunc = cast(shared)callbackFunc; 149 const auto err = fdb_future_set_callback( 150 cast(FutureHandle) fh, 151 cast(FDBCallback) &futureReady, 152 cast(void*) this); 153 enforceError(err); 154 155 return this; 156 } 157 158 shared(V) await(C callbackFunc) 159 { 160 if (callbackFunc) 161 start(callbackFunc); 162 163 shared err = fdb_future_block_until_ready(cast(FutureHandle)fh); 164 if (err != FDBError.SUCCESS) 165 { 166 exception = cast(shared)err.toException; 167 enforce(exception is null, cast(Exception)exception); 168 } 169 170 static if (!is(V == void)) 171 value = cast(shared)extractValue(fh, err); 172 173 exception = cast(shared)err.toException; 174 175 enforce(exception is null, cast(Exception)exception); 176 static if (!is(V == void)) 177 return value; 178 } 179 180 override shared(V) await() 181 { 182 static if (!is(V == void)) 183 return await(null); 184 else 185 await(null); 186 } 187 188 extern(C) static void futureReady(SFH f, SF thiz) 189 { 190 thread_attachThis; 191 auto futureTask = task!worker(f, thiz); 192 // or futureTask.executeInNewThread? 193 taskPool.put(futureTask); 194 } 195 196 static void worker(SFH f, SF thiz) 197 { 198 shared fdb_error_t err; 199 with (thiz) 200 { 201 static if (is(V == void)) 202 { 203 extractValue(cast(shared)f, err); 204 if (callbackFunc) 205 (cast(C)callbackFunc)(err.toException); 206 } 207 else 208 { 209 auto value = extractValue(cast(shared)f, err); 210 if (callbackFunc) 211 (cast(C)callbackFunc)(err.toException, value); 212 } 213 } 214 } 215 216 abstract V extractValue(SFH fh, out SE err); 217 } 218 219 private mixin template FDBFutureCtor() 220 { 221 this(FutureHandle fh, shared Transaction tr = null) 222 { 223 super(fh, tr); 224 } 225 } 226 227 alias ValueFutureCallback = FutureCallback!Value; 228 229 shared class ValueFuture : FDBFutureBase!(ValueFutureCallback, Value) 230 { 231 mixin FDBFutureCtor; 232 233 private alias PValue = ubyte *; 234 235 override Value extractValue(SFH fh, out SE err) 236 { 237 PValue value; 238 int valueLength, 239 valuePresent; 240 241 err = fdb_future_get_value( 242 cast(FutureHandle)fh, 243 &valuePresent, 244 &value, 245 &valueLength); 246 if (err != FDBError.SUCCESS || !valuePresent) 247 return null; 248 return value[0..valueLength]; 249 } 250 } 251 252 alias KeyFutureCallback = FutureCallback!Key; 253 254 shared class KeyFuture : FDBFutureBase!(KeyFutureCallback, Key) 255 { 256 mixin FDBFutureCtor; 257 258 private alias PKey = ubyte *; 259 260 override Key extractValue(SFH fh, out SE err) 261 { 262 PKey key; 263 int keyLength; 264 265 err = fdb_future_get_key( 266 cast(FutureHandle)fh, 267 &key, 268 &keyLength); 269 if (err != FDBError.SUCCESS) 270 return typeof(return).init; 271 return key[0..keyLength]; 272 } 273 } 274 275 alias VoidFutureCallback = void delegate(Exception ex); 276 277 shared class VoidFuture : FDBFutureBase!(VoidFutureCallback, void) 278 { 279 mixin FDBFutureCtor; 280 281 override void extractValue(SFH fh, out SE err) 282 { 283 err = fdb_future_get_error( 284 cast(FutureHandle)fh); 285 } 286 } 287 288 alias KeyValueFutureCallback = FutureCallback!RecordRange; 289 alias ForEachCallback = void delegate(Record record); 290 alias BreakableForEachCallback = void delegate( 291 Record record, 292 out bool breakLoop); 293 294 shared class KeyValueFuture 295 : FDBFutureBase!(KeyValueFutureCallback, RecordRange) 296 { 297 const RangeInfo info; 298 299 private IDisposable[] futures; 300 301 this(FutureHandle fh, shared Transaction tr, RangeInfo info) 302 { 303 super(fh, tr); 304 305 this.info = cast(shared)info; 306 } 307 308 override RecordRange extractValue(SFH fh, out SE err) 309 { 310 FDBKeyValue * kvs; 311 int len; 312 // Receives true if there are more result, or false if all results have 313 // been transmitted 314 fdb_bool_t more; 315 err = fdb_future_get_keyvalue_array( 316 cast(FutureHandle)fh, 317 &kvs, 318 &len, 319 &more); 320 if (err != FDBError.SUCCESS) 321 return typeof(return).init; 322 323 auto records = minimallyInitializedArray!(Record[])(len); 324 foreach (i, kv; kvs[0..len]) 325 { 326 records[i].key = (cast(Key) kv.key [0..kv.key_length ]).dup; 327 records[i].value = (cast(Value)kv.value[0..kv.value_length]).dup; 328 } 329 330 return RecordRange( 331 records, 332 cast(bool)more, 333 cast(RangeInfo)info, 334 tr); 335 } 336 337 auto forEach(FC)(FC fun, CompletionCallback cb) 338 { 339 auto future = createFuture!(foreachTask!FC)(this, fun, cb); 340 synchronized (this) 341 futures ~= future; 342 return future; 343 } 344 345 static void foreachTask(FC)( 346 shared KeyValueFuture future, 347 FC fun, 348 CompletionCallback cb) 349 { 350 try 351 { 352 // This will block until value is ready 353 auto range = cast(RecordRange)future.await; 354 foreach (kv; range) 355 { 356 static if (arity!fun == 2) 357 { 358 bool breakLoop; 359 fun(kv, breakLoop); 360 if (breakLoop) break; 361 } 362 else 363 fun(kv); 364 } 365 366 cb(null); 367 } 368 catch (Exception ex) 369 { 370 cb(ex); 371 } 372 } 373 } 374 375 alias VersionFutureCallback = FutureCallback!ulong; 376 377 shared class VersionFuture : FDBFutureBase!(VersionFutureCallback, ulong) 378 { 379 mixin FDBFutureCtor; 380 381 override ulong extractValue(SFH fh, out SE err) 382 { 383 long ver; 384 err = fdb_future_get_version( 385 cast(FutureHandle)fh, 386 &ver); 387 if (err != FDBError.SUCCESS) 388 return typeof(return).init; 389 return ver; 390 } 391 } 392 393 alias StringFutureCallback = FutureCallback!(string[]); 394 395 shared class StringFuture : FDBFutureBase!(StringFutureCallback, string[]) 396 { 397 mixin FDBFutureCtor; 398 399 override string[] extractValue(SFH fh, out SE err) 400 { 401 char ** stringArr; 402 int count; 403 err = fdb_future_get_string_array( 404 cast(FutureHandle)fh, 405 &stringArr, 406 &count); 407 if (err != FDBError.SUCCESS) 408 return typeof(return).init; 409 auto strings = stringArr[0..count].map!(to!string).array; 410 return strings; 411 } 412 } 413 414 shared class WatchFuture : VoidFuture 415 { 416 mixin FDBFutureCtor; 417 418 ~this() 419 { 420 cancel; 421 } 422 423 void cancel() 424 { 425 if (fh) 426 fdb_future_cancel(cast(FutureHandle)fh); 427 } 428 } 429 430 auto createFuture(T)() 431 { 432 auto future = new BasicFuture!T; 433 return future; 434 } 435 436 auto createFuture(F, Args...)(Args args) 437 { 438 auto future = new shared F(args); 439 return future; 440 } 441 442 auto createFuture(alias fun, Args...)(Args args) 443 if (isSomeFunction!fun) 444 { 445 auto future = new shared FunctionFuture!(fun, Args)(args); 446 return future; 447 } 448 449 auto startOrCreateFuture(F, C, Args...)( 450 Args args, 451 C callback) 452 { 453 auto future = createFuture!F(args); 454 if (callback) 455 future.start(callback); 456 return future; 457 } 458 459 void await(F...)(F futures) 460 { 461 foreach (f; futures) 462 f.await; 463 }