1 module fdb.future; 2 3 import 4 core.sync.semaphore, 5 core.thread; 6 7 import 8 std.algorithm, 9 std.array, 10 std.conv, 11 std.exception, 12 std.parallelism, 13 std.traits; 14 15 import 16 fdb.disposable, 17 fdb.error, 18 fdb.fdb_c, 19 fdb.range, 20 fdb.rangeinfo, 21 fdb.transaction; 22 23 alias CompletionCallback = void delegate(Exception ex); 24 25 private mixin template ExceptionCtorMixin() 26 { 27 this(string msg = null, Throwable next = null) 28 { 29 super(msg, next); 30 } 31 32 this(string msg, string file, size_t line, Throwable next = null) 33 { 34 super(msg, file, line, next); 35 } 36 } 37 38 class FutureException : Exception 39 { 40 mixin ExceptionCtorMixin; 41 } 42 43 shared class FutureBase(V) 44 { 45 static if (!is(V == void)) 46 { 47 protected V value; 48 } 49 50 protected Exception exception; 51 52 abstract shared(V) await(); 53 } 54 55 shared class FunctionFuture(alias fun, bool pool = true, Args...) : 56 FutureBase!(ReturnType!fun), 57 58 // dummy implementation to allow storage in KeyValueFuture 59 IDisposable 60 { 61 alias V = ReturnType!fun; 62 alias T = Task!(fun, ParameterTypeTuple!fun) *; 63 private T t; 64 65 this(Args args) 66 { 67 t = cast(shared)task!fun(args); 68 auto localTask = cast(T)t; 69 static if (pool) 70 taskPool.put(localTask); 71 else 72 localTask.executeInNewThread; 73 } 74 75 void dispose() {} 76 77 override shared(V) await() 78 { 79 try 80 { 81 auto localTask = cast(T)t; 82 static if (!is(V == void)) 83 value = localtask.yieldForce; 84 else 85 localTask.yieldForce; 86 } 87 catch (Exception ex) 88 { 89 exception = cast(shared)ex; 90 } 91 92 enforce(exception is null, cast(Exception)exception); 93 static if (!is(V == void)) 94 return value; 95 } 96 } 97 98 alias FutureCallback(V) = void delegate(Exception ex, V value); 99 100 shared class FDBFutureBase(C, V) : FutureBase!V, IDisposable 101 { 102 private alias SF = shared FDBFutureBase!(C, V); 103 private alias SFH = shared FutureHandle; 104 private alias SE = shared fdb_error_t; 105 106 private FutureHandle fh; 107 private Transaction tr; 108 private C callbackFunc; 109 110 this(FutureHandle fh, shared Transaction tr) 111 { 112 this.fh = cast(shared)fh; 113 this.tr = tr; 114 } 115 116 ~this() 117 { 118 dispose; 119 } 120 121 void dispose() 122 { 123 if (!fh) return; 124 125 // NB : Also releases the memory returned by get functions 126 fdb_future_destroy(cast(FutureHandle)fh); 127 fh = null; 128 } 129 130 auto start(C callbackFunc) 131 { 132 this.callbackFunc = cast(shared)callbackFunc; 133 const auto err = fdb_future_set_callback( 134 cast(FutureHandle) fh, 135 cast(FDBCallback) &futureReady, 136 cast(void*) this); 137 enforceError(err); 138 139 return this; 140 } 141 142 shared(V) await(C callbackFunc) 143 { 144 if (callbackFunc) 145 start(callbackFunc); 146 147 shared err = fdb_future_block_until_ready(cast(FutureHandle)fh); 148 if (err != FDBError.SUCCESS) 149 { 150 exception = cast(shared)err.toException; 151 enforce(exception is null, cast(Exception)exception); 152 } 153 154 static if (!is(V == void)) 155 value = cast(shared)extractValue(fh, err); 156 else 157 extractValue(fh, err); 158 159 exception = cast(shared)err.toException; 160 161 enforce(exception is null, cast(Exception)exception); 162 static if (!is(V == void)) 163 return value; 164 } 165 166 override shared(V) await() 167 { 168 static if (!is(V == void)) 169 return await(null); 170 else 171 await(null); 172 } 173 174 extern(C) static void futureReady(SFH f, SF thiz) 175 { 176 thread_attachThis; 177 auto futureTask = task!worker(f, thiz); 178 // or futureTask.executeInNewThread? 179 taskPool.put(futureTask); 180 } 181 182 static void worker(SFH f, SF thiz) 183 { 184 shared fdb_error_t err; 185 with (thiz) 186 { 187 static if (is(V == void)) 188 { 189 extractValue(cast(shared)f, err); 190 if (callbackFunc) 191 (cast(C)callbackFunc)(err.toException); 192 } 193 else 194 { 195 auto value = extractValue(cast(shared)f, err); 196 if (callbackFunc) 197 (cast(C)callbackFunc)(err.toException, value); 198 } 199 } 200 } 201 202 abstract V extractValue(SFH fh, out SE err); 203 } 204 205 private mixin template FDBFutureCtor() 206 { 207 this(FutureHandle fh, shared Transaction tr = null) 208 { 209 super(fh, tr); 210 } 211 } 212 213 alias ValueFutureCallback = FutureCallback!Value; 214 215 shared class ValueFuture : FDBFutureBase!(ValueFutureCallback, Value) 216 { 217 mixin FDBFutureCtor; 218 219 private alias PValue = ubyte *; 220 221 override Value extractValue(SFH fh, out SE err) 222 { 223 PValue value; 224 int valueLength, 225 valuePresent; 226 227 err = fdb_future_get_value( 228 cast(FutureHandle)fh, 229 &valuePresent, 230 &value, 231 &valueLength); 232 if (err != FDBError.SUCCESS || !valuePresent) 233 return null; 234 return value[0..valueLength]; 235 } 236 } 237 238 alias KeyFutureCallback = FutureCallback!Key; 239 240 shared class KeyFuture : FDBFutureBase!(KeyFutureCallback, Key) 241 { 242 mixin FDBFutureCtor; 243 244 private alias PKey = ubyte *; 245 246 override Key extractValue(SFH fh, out SE err) 247 { 248 PKey key; 249 int keyLength; 250 251 err = fdb_future_get_key( 252 cast(FutureHandle)fh, 253 &key, 254 &keyLength); 255 if (err != FDBError.SUCCESS) 256 return typeof(return).init; 257 return key[0..keyLength]; 258 } 259 } 260 261 alias VoidFutureCallback = void delegate(Exception ex); 262 263 shared class VoidFuture : FDBFutureBase!(VoidFutureCallback, void) 264 { 265 mixin FDBFutureCtor; 266 267 override void extractValue(SFH fh, out SE err) 268 { 269 err = fdb_future_get_error( 270 cast(FutureHandle)fh); 271 } 272 } 273 274 alias KeyValueFutureCallback = FutureCallback!RecordRange; 275 alias ForEachCallback = void delegate(Record record); 276 alias BreakableForEachCallback = void delegate( 277 Record record, 278 out bool breakLoop); 279 280 shared class KeyValueFuture 281 : FDBFutureBase!(KeyValueFutureCallback, RecordRange) 282 { 283 const RangeInfo info; 284 285 private IDisposable[] futures; 286 287 this(FutureHandle fh, shared Transaction tr, RangeInfo info) 288 { 289 super(fh, tr); 290 291 this.info = cast(shared)info; 292 } 293 294 override RecordRange extractValue(SFH fh, out SE err) 295 { 296 FDBKeyValue * kvs; 297 int len; 298 // Receives true if there are more result, or false if all results have 299 // been transmitted 300 fdb_bool_t more; 301 err = fdb_future_get_keyvalue_array( 302 cast(FutureHandle)fh, 303 &kvs, 304 &len, 305 &more); 306 if (err != FDBError.SUCCESS) 307 return typeof(return).init; 308 309 auto records = minimallyInitializedArray!(Record[])(len); 310 foreach (i, kv; kvs[0..len]) 311 { 312 records[i].key = (cast(Key) kv.key [0..kv.key_length ]).dup; 313 records[i].value = (cast(Value)kv.value[0..kv.value_length]).dup; 314 } 315 316 return RecordRange( 317 records, 318 cast(bool)more, 319 cast(RangeInfo)info, 320 tr); 321 } 322 323 auto forEach(FC)(FC fun, CompletionCallback cb) 324 { 325 auto future = createFuture!(foreachTask!FC)(this, fun, cb); 326 synchronized (this) 327 futures ~= future; 328 return future; 329 } 330 331 static void foreachTask(FC)( 332 shared KeyValueFuture future, 333 FC fun, 334 CompletionCallback cb) 335 { 336 try 337 { 338 // This will block until value is ready 339 auto range = cast(RecordRange)future.await; 340 foreach (kv; range) 341 { 342 static if (arity!fun == 2) 343 { 344 bool breakLoop; 345 fun(kv, breakLoop); 346 if (breakLoop) break; 347 } 348 else 349 fun(kv); 350 } 351 352 cb(null); 353 } 354 catch (Exception ex) 355 { 356 cb(ex); 357 } 358 } 359 } 360 361 alias VersionFutureCallback = FutureCallback!ulong; 362 363 shared class VersionFuture : FDBFutureBase!(VersionFutureCallback, ulong) 364 { 365 mixin FDBFutureCtor; 366 367 override ulong extractValue(SFH fh, out SE err) 368 { 369 long ver; 370 err = fdb_future_get_version( 371 cast(FutureHandle)fh, 372 &ver); 373 if (err != FDBError.SUCCESS) 374 return typeof(return).init; 375 return ver; 376 } 377 } 378 379 alias StringFutureCallback = FutureCallback!(string[]); 380 381 shared class StringFuture : FDBFutureBase!(StringFutureCallback, string[]) 382 { 383 mixin FDBFutureCtor; 384 385 override string[] extractValue(SFH fh, out SE err) 386 { 387 char ** stringArr; 388 int count; 389 err = fdb_future_get_string_array( 390 cast(FutureHandle)fh, 391 &stringArr, 392 &count); 393 if (err != FDBError.SUCCESS) 394 return typeof(return).init; 395 auto strings = stringArr[0..count].map!(to!string).array; 396 return strings; 397 } 398 } 399 400 shared class WatchFuture : VoidFuture 401 { 402 mixin FDBFutureCtor; 403 404 ~this() 405 { 406 cancel; 407 } 408 409 void cancel() 410 { 411 if (fh) 412 fdb_future_cancel(cast(FutureHandle)fh); 413 } 414 } 415 416 auto createFuture(F, Args...)(Args args) 417 { 418 auto future = new shared F(args); 419 return future; 420 } 421 422 auto createFuture(alias fun, bool pool = true, Args...)(Args args) 423 if (isSomeFunction!fun) 424 { 425 auto future = new shared FunctionFuture!(fun, pool, Args)(args); 426 return future; 427 } 428 429 auto startOrCreateFuture(F, C, Args...)(Args args, C callback) 430 { 431 auto future = createFuture!F(args); 432 if (callback) 433 future.start(callback); 434 return future; 435 } 436 437 void await(F...)(F futures) 438 { 439 foreach (f; futures) 440 f.await; 441 }