1 //===------- RPCUTils.h - Utilities for building RPC APIs -------*- C++ -*-===// 2 // 3 // The LLVM Compiler Infrastructure 4 // 5 // This file is distributed under the University of Illinois Open Source 6 // License. See LICENSE.TXT for details. 7 // 8 //===----------------------------------------------------------------------===// 9 // 10 // Utilities to support construction of simple RPC APIs. 11 // 12 // The RPC utilities aim for ease of use (minimal conceptual overhead) for C++ 13 // programmers, high performance, low memory overhead, and efficient use of the 14 // communications channel. 15 // 16 //===----------------------------------------------------------------------===// 17 18 #ifndef LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H 19 #define LLVM_EXECUTIONENGINE_ORC_RPCUTILS_H 20 21 #include <map> 22 #include <thread> 23 #include <vector> 24 25 #include "llvm/ADT/STLExtras.h" 26 #include "llvm/ExecutionEngine/Orc/OrcError.h" 27 #include "llvm/ExecutionEngine/Orc/RPCSerialization.h" 28 29 #include <future> 30 31 namespace llvm { 32 namespace orc { 33 namespace rpc { 34 35 template <typename DerivedFunc, typename FnT> class Function; 36 37 // RPC Function class. 38 // DerivedFunc should be a user defined class with a static 'getName()' method 39 // returning a const char* representing the function's name. 40 template <typename DerivedFunc, typename RetT, typename... ArgTs> 41 class Function<DerivedFunc, RetT(ArgTs...)> { 42 public: 43 /// User defined function type. 44 using Type = RetT(ArgTs...); 45 46 /// Return type. 47 using ReturnType = RetT; 48 49 /// Returns the full function prototype as a string. 50 static const char *getPrototype() { 51 std::lock_guard<std::mutex> Lock(NameMutex); 52 if (Name.empty()) 53 raw_string_ostream(Name) 54 << RPCTypeName<RetT>::getName() << " " << DerivedFunc::getName() 55 << "(" << llvm::orc::rpc::RPCTypeNameSequence<ArgTs...>() << ")"; 56 return Name.data(); 57 } 58 59 private: 60 static std::mutex NameMutex; 61 static std::string Name; 62 }; 63 64 template <typename DerivedFunc, typename RetT, typename... ArgTs> 65 std::mutex Function<DerivedFunc, RetT(ArgTs...)>::NameMutex; 66 67 template <typename DerivedFunc, typename RetT, typename... ArgTs> 68 std::string Function<DerivedFunc, RetT(ArgTs...)>::Name; 69 70 /// Allocates RPC function ids during autonegotiation. 71 /// Specializations of this class must provide four members: 72 /// 73 /// static T getInvalidId(): 74 /// Should return a reserved id that will be used to represent missing 75 /// functions during autonegotiation. 76 /// 77 /// static T getResponseId(): 78 /// Should return a reserved id that will be used to send function responses 79 /// (return values). 80 /// 81 /// static T getNegotiateId(): 82 /// Should return a reserved id for the negotiate function, which will be used 83 /// to negotiate ids for user defined functions. 84 /// 85 /// template <typename Func> T allocate(): 86 /// Allocate a unique id for function Func. 87 template <typename T, typename = void> class RPCFunctionIdAllocator; 88 89 /// This specialization of RPCFunctionIdAllocator provides a default 90 /// implementation for integral types. 91 template <typename T> 92 class RPCFunctionIdAllocator< 93 T, typename std::enable_if<std::is_integral<T>::value>::type> { 94 public: 95 static T getInvalidId() { return T(0); } 96 static T getResponseId() { return T(1); } 97 static T getNegotiateId() { return T(2); } 98 99 template <typename Func> T allocate() { return NextId++; } 100 101 private: 102 T NextId = 3; 103 }; 104 105 namespace detail { 106 107 // FIXME: Remove MSVCPError/MSVCPExpected once MSVC's future implementation 108 // supports classes without default constructors. 109 #ifdef _MSC_VER 110 111 namespace msvc_hacks { 112 113 // Work around MSVC's future implementation's use of default constructors: 114 // A default constructed value in the promise will be overwritten when the 115 // real error is set - so the default constructed Error has to be checked 116 // already. 117 class MSVCPError : public Error { 118 public: 119 MSVCPError() { (void)!!*this; } 120 121 MSVCPError(MSVCPError &&Other) : Error(std::move(Other)) {} 122 123 MSVCPError &operator=(MSVCPError Other) { 124 Error::operator=(std::move(Other)); 125 return *this; 126 } 127 128 MSVCPError(Error Err) : Error(std::move(Err)) {} 129 }; 130 131 // Work around MSVC's future implementation, similar to MSVCPError. 132 template <typename T> class MSVCPExpected : public Expected<T> { 133 public: 134 MSVCPExpected() 135 : Expected<T>(make_error<StringError>("", inconvertibleErrorCode())) { 136 consumeError(this->takeError()); 137 } 138 139 MSVCPExpected(MSVCPExpected &&Other) : Expected<T>(std::move(Other)) {} 140 141 MSVCPExpected &operator=(MSVCPExpected &&Other) { 142 Expected<T>::operator=(std::move(Other)); 143 return *this; 144 } 145 146 MSVCPExpected(Error Err) : Expected<T>(std::move(Err)) {} 147 148 template <typename OtherT> 149 MSVCPExpected( 150 OtherT &&Val, 151 typename std::enable_if<std::is_convertible<OtherT, T>::value>::type * = 152 nullptr) 153 : Expected<T>(std::move(Val)) {} 154 155 template <class OtherT> 156 MSVCPExpected( 157 Expected<OtherT> &&Other, 158 typename std::enable_if<std::is_convertible<OtherT, T>::value>::type * = 159 nullptr) 160 : Expected<T>(std::move(Other)) {} 161 162 template <class OtherT> 163 explicit MSVCPExpected( 164 Expected<OtherT> &&Other, 165 typename std::enable_if<!std::is_convertible<OtherT, T>::value>::type * = 166 nullptr) 167 : Expected<T>(std::move(Other)) {} 168 }; 169 170 } // end namespace msvc_hacks 171 172 #endif // _MSC_VER 173 174 /// Provides a typedef for a tuple containing the decayed argument types. 175 template <typename T> class FunctionArgsTuple; 176 177 template <typename RetT, typename... ArgTs> 178 class FunctionArgsTuple<RetT(ArgTs...)> { 179 public: 180 using Type = std::tuple<typename std::decay< 181 typename std::remove_reference<ArgTs>::type>::type...>; 182 }; 183 184 // ResultTraits provides typedefs and utilities specific to the return type 185 // of functions. 186 template <typename RetT> class ResultTraits { 187 public: 188 // The return type wrapped in llvm::Expected. 189 using ErrorReturnType = Expected<RetT>; 190 191 #ifdef _MSC_VER 192 // The ErrorReturnType wrapped in a std::promise. 193 using ReturnPromiseType = std::promise<msvc_hacks::MSVCPExpected<RetT>>; 194 195 // The ErrorReturnType wrapped in a std::future. 196 using ReturnFutureType = std::future<msvc_hacks::MSVCPExpected<RetT>>; 197 #else 198 // The ErrorReturnType wrapped in a std::promise. 199 using ReturnPromiseType = std::promise<ErrorReturnType>; 200 201 // The ErrorReturnType wrapped in a std::future. 202 using ReturnFutureType = std::future<ErrorReturnType>; 203 #endif 204 205 // Create a 'blank' value of the ErrorReturnType, ready and safe to 206 // overwrite. 207 static ErrorReturnType createBlankErrorReturnValue() { 208 return ErrorReturnType(RetT()); 209 } 210 211 // Consume an abandoned ErrorReturnType. 212 static void consumeAbandoned(ErrorReturnType RetOrErr) { 213 consumeError(RetOrErr.takeError()); 214 } 215 }; 216 217 // ResultTraits specialization for void functions. 218 template <> class ResultTraits<void> { 219 public: 220 // For void functions, ErrorReturnType is llvm::Error. 221 using ErrorReturnType = Error; 222 223 #ifdef _MSC_VER 224 // The ErrorReturnType wrapped in a std::promise. 225 using ReturnPromiseType = std::promise<msvc_hacks::MSVCPError>; 226 227 // The ErrorReturnType wrapped in a std::future. 228 using ReturnFutureType = std::future<msvc_hacks::MSVCPError>; 229 #else 230 // The ErrorReturnType wrapped in a std::promise. 231 using ReturnPromiseType = std::promise<ErrorReturnType>; 232 233 // The ErrorReturnType wrapped in a std::future. 234 using ReturnFutureType = std::future<ErrorReturnType>; 235 #endif 236 237 // Create a 'blank' value of the ErrorReturnType, ready and safe to 238 // overwrite. 239 static ErrorReturnType createBlankErrorReturnValue() { 240 return ErrorReturnType::success(); 241 } 242 243 // Consume an abandoned ErrorReturnType. 244 static void consumeAbandoned(ErrorReturnType Err) { 245 consumeError(std::move(Err)); 246 } 247 }; 248 249 // ResultTraits<Error> is equivalent to ResultTraits<void>. This allows 250 // handlers for void RPC functions to return either void (in which case they 251 // implicitly succeed) or Error (in which case their error return is 252 // propagated). See usage in HandlerTraits::runHandlerHelper. 253 template <> class ResultTraits<Error> : public ResultTraits<void> {}; 254 255 // ResultTraits<Expected<T>> is equivalent to ResultTraits<T>. This allows 256 // handlers for RPC functions returning a T to return either a T (in which 257 // case they implicitly succeed) or Expected<T> (in which case their error 258 // return is propagated). See usage in HandlerTraits::runHandlerHelper. 259 template <typename RetT> 260 class ResultTraits<Expected<RetT>> : public ResultTraits<RetT> {}; 261 262 // Send a response of the given wire return type (WireRetT) over the 263 // channel, with the given sequence number. 264 template <typename WireRetT, typename HandlerRetT, typename ChannelT, 265 typename FunctionIdT, typename SequenceNumberT> 266 static Error respond(ChannelT &C, const FunctionIdT &ResponseId, 267 SequenceNumberT SeqNo, Expected<HandlerRetT> ResultOrErr) { 268 // If this was an error bail out. 269 // FIXME: Send an "error" message to the client if this is not a channel 270 // failure? 271 if (auto Err = ResultOrErr.takeError()) 272 return Err; 273 274 // Open the response message. 275 if (auto Err = C.startSendMessage(ResponseId, SeqNo)) 276 return Err; 277 278 // Serialize the result. 279 if (auto Err = 280 SerializationTraits<ChannelT, WireRetT, HandlerRetT>::serialize( 281 C, *ResultOrErr)) 282 return Err; 283 284 // Close the response message. 285 return C.endSendMessage(); 286 } 287 288 // Send an empty response message on the given channel to indicate that 289 // the handler ran. 290 template <typename WireRetT, typename ChannelT, typename FunctionIdT, 291 typename SequenceNumberT> 292 Error respond(ChannelT &C, const FunctionIdT &ResponseId, SequenceNumberT SeqNo, 293 Error Err) { 294 if (Err) 295 return Err; 296 if (auto Err2 = C.startSendMessage(ResponseId, SeqNo)) 297 return Err2; 298 return C.endSendMessage(); 299 } 300 301 // Converts a given type to the equivalent error return type. 302 template <typename T> class WrappedHandlerReturn { 303 public: 304 using Type = Expected<T>; 305 }; 306 307 template <typename T> class WrappedHandlerReturn<Expected<T>> { 308 public: 309 using Type = Expected<T>; 310 }; 311 312 template <> class WrappedHandlerReturn<void> { 313 public: 314 using Type = Error; 315 }; 316 317 template <> class WrappedHandlerReturn<Error> { 318 public: 319 using Type = Error; 320 }; 321 322 template <> class WrappedHandlerReturn<ErrorSuccess> { 323 public: 324 using Type = Error; 325 }; 326 327 // Traits class that strips the response function from the list of handler 328 // arguments. 329 template <typename FnT> class AsyncHandlerTraits; 330 331 template <typename ResultT, typename... ArgTs> 332 class AsyncHandlerTraits<Error(std::function<Error(Expected<ResultT>)>, ArgTs...)> { 333 public: 334 using Type = Error(ArgTs...); 335 using ResultType = Expected<ResultT>; 336 }; 337 338 template <typename... ArgTs> 339 class AsyncHandlerTraits<Error(std::function<Error(Error)>, ArgTs...)> { 340 public: 341 using Type = Error(ArgTs...); 342 using ResultType = Error; 343 }; 344 345 template <typename ResponseHandlerT, typename... ArgTs> 346 class AsyncHandlerTraits<Error(ResponseHandlerT, ArgTs...)> : 347 public AsyncHandlerTraits<Error(typename std::decay<ResponseHandlerT>::type, 348 ArgTs...)> {}; 349 350 // This template class provides utilities related to RPC function handlers. 351 // The base case applies to non-function types (the template class is 352 // specialized for function types) and inherits from the appropriate 353 // speciilization for the given non-function type's call operator. 354 template <typename HandlerT> 355 class HandlerTraits : public HandlerTraits<decltype( 356 &std::remove_reference<HandlerT>::type::operator())> { 357 }; 358 359 // Traits for handlers with a given function type. 360 template <typename RetT, typename... ArgTs> 361 class HandlerTraits<RetT(ArgTs...)> { 362 public: 363 // Function type of the handler. 364 using Type = RetT(ArgTs...); 365 366 // Return type of the handler. 367 using ReturnType = RetT; 368 369 // Call the given handler with the given arguments. 370 template <typename HandlerT, typename... TArgTs> 371 static typename WrappedHandlerReturn<RetT>::Type 372 unpackAndRun(HandlerT &Handler, std::tuple<TArgTs...> &Args) { 373 return unpackAndRunHelper(Handler, Args, 374 llvm::index_sequence_for<TArgTs...>()); 375 } 376 377 // Call the given handler with the given arguments. 378 template <typename HandlerT, typename ResponderT, typename... TArgTs> 379 static Error unpackAndRunAsync(HandlerT &Handler, ResponderT &Responder, 380 std::tuple<TArgTs...> &Args) { 381 return unpackAndRunAsyncHelper(Handler, Responder, Args, 382 llvm::index_sequence_for<TArgTs...>()); 383 } 384 385 // Call the given handler with the given arguments. 386 template <typename HandlerT> 387 static typename std::enable_if< 388 std::is_void<typename HandlerTraits<HandlerT>::ReturnType>::value, 389 Error>::type 390 run(HandlerT &Handler, ArgTs &&... Args) { 391 Handler(std::move(Args)...); 392 return Error::success(); 393 } 394 395 template <typename HandlerT, typename... TArgTs> 396 static typename std::enable_if< 397 !std::is_void<typename HandlerTraits<HandlerT>::ReturnType>::value, 398 typename HandlerTraits<HandlerT>::ReturnType>::type 399 run(HandlerT &Handler, TArgTs... Args) { 400 return Handler(std::move(Args)...); 401 } 402 403 // Serialize arguments to the channel. 404 template <typename ChannelT, typename... CArgTs> 405 static Error serializeArgs(ChannelT &C, const CArgTs... CArgs) { 406 return SequenceSerialization<ChannelT, ArgTs...>::serialize(C, CArgs...); 407 } 408 409 // Deserialize arguments from the channel. 410 template <typename ChannelT, typename... CArgTs> 411 static Error deserializeArgs(ChannelT &C, std::tuple<CArgTs...> &Args) { 412 return deserializeArgsHelper(C, Args, 413 llvm::index_sequence_for<CArgTs...>()); 414 } 415 416 private: 417 template <typename ChannelT, typename... CArgTs, size_t... Indexes> 418 static Error deserializeArgsHelper(ChannelT &C, std::tuple<CArgTs...> &Args, 419 llvm::index_sequence<Indexes...> _) { 420 return SequenceSerialization<ChannelT, ArgTs...>::deserialize( 421 C, std::get<Indexes>(Args)...); 422 } 423 424 template <typename HandlerT, typename ArgTuple, size_t... Indexes> 425 static typename WrappedHandlerReturn< 426 typename HandlerTraits<HandlerT>::ReturnType>::Type 427 unpackAndRunHelper(HandlerT &Handler, ArgTuple &Args, 428 llvm::index_sequence<Indexes...>) { 429 return run(Handler, std::move(std::get<Indexes>(Args))...); 430 } 431 432 433 template <typename HandlerT, typename ResponderT, typename ArgTuple, 434 size_t... Indexes> 435 static typename WrappedHandlerReturn< 436 typename HandlerTraits<HandlerT>::ReturnType>::Type 437 unpackAndRunAsyncHelper(HandlerT &Handler, ResponderT &Responder, 438 ArgTuple &Args, 439 llvm::index_sequence<Indexes...>) { 440 return run(Handler, Responder, std::move(std::get<Indexes>(Args))...); 441 } 442 }; 443 444 // Handler traits for free functions. 445 template <typename RetT, typename... ArgTs> 446 class HandlerTraits<RetT(*)(ArgTs...)> 447 : public HandlerTraits<RetT(ArgTs...)> {}; 448 449 // Handler traits for class methods (especially call operators for lambdas). 450 template <typename Class, typename RetT, typename... ArgTs> 451 class HandlerTraits<RetT (Class::*)(ArgTs...)> 452 : public HandlerTraits<RetT(ArgTs...)> {}; 453 454 // Handler traits for const class methods (especially call operators for 455 // lambdas). 456 template <typename Class, typename RetT, typename... ArgTs> 457 class HandlerTraits<RetT (Class::*)(ArgTs...) const> 458 : public HandlerTraits<RetT(ArgTs...)> {}; 459 460 // Utility to peel the Expected wrapper off a response handler error type. 461 template <typename HandlerT> class ResponseHandlerArg; 462 463 template <typename ArgT> class ResponseHandlerArg<Error(Expected<ArgT>)> { 464 public: 465 using ArgType = Expected<ArgT>; 466 using UnwrappedArgType = ArgT; 467 }; 468 469 template <typename ArgT> 470 class ResponseHandlerArg<ErrorSuccess(Expected<ArgT>)> { 471 public: 472 using ArgType = Expected<ArgT>; 473 using UnwrappedArgType = ArgT; 474 }; 475 476 template <> class ResponseHandlerArg<Error(Error)> { 477 public: 478 using ArgType = Error; 479 }; 480 481 template <> class ResponseHandlerArg<ErrorSuccess(Error)> { 482 public: 483 using ArgType = Error; 484 }; 485 486 // ResponseHandler represents a handler for a not-yet-received function call 487 // result. 488 template <typename ChannelT> class ResponseHandler { 489 public: 490 virtual ~ResponseHandler() {} 491 492 // Reads the function result off the wire and acts on it. The meaning of 493 // "act" will depend on how this method is implemented in any given 494 // ResponseHandler subclass but could, for example, mean running a 495 // user-specified handler or setting a promise value. 496 virtual Error handleResponse(ChannelT &C) = 0; 497 498 // Abandons this outstanding result. 499 virtual void abandon() = 0; 500 501 // Create an error instance representing an abandoned response. 502 static Error createAbandonedResponseError() { 503 return errorCodeToError(orcError(OrcErrorCode::RPCResponseAbandoned)); 504 } 505 }; 506 507 // ResponseHandler subclass for RPC functions with non-void returns. 508 template <typename ChannelT, typename FuncRetT, typename HandlerT> 509 class ResponseHandlerImpl : public ResponseHandler<ChannelT> { 510 public: 511 ResponseHandlerImpl(HandlerT Handler) : Handler(std::move(Handler)) {} 512 513 // Handle the result by deserializing it from the channel then passing it 514 // to the user defined handler. 515 Error handleResponse(ChannelT &C) override { 516 using UnwrappedArgType = typename ResponseHandlerArg< 517 typename HandlerTraits<HandlerT>::Type>::UnwrappedArgType; 518 UnwrappedArgType Result; 519 if (auto Err = 520 SerializationTraits<ChannelT, FuncRetT, 521 UnwrappedArgType>::deserialize(C, Result)) 522 return Err; 523 if (auto Err = C.endReceiveMessage()) 524 return Err; 525 return Handler(std::move(Result)); 526 } 527 528 // Abandon this response by calling the handler with an 'abandoned response' 529 // error. 530 void abandon() override { 531 if (auto Err = Handler(this->createAbandonedResponseError())) { 532 // Handlers should not fail when passed an abandoned response error. 533 report_fatal_error(std::move(Err)); 534 } 535 } 536 537 private: 538 HandlerT Handler; 539 }; 540 541 // ResponseHandler subclass for RPC functions with void returns. 542 template <typename ChannelT, typename HandlerT> 543 class ResponseHandlerImpl<ChannelT, void, HandlerT> 544 : public ResponseHandler<ChannelT> { 545 public: 546 ResponseHandlerImpl(HandlerT Handler) : Handler(std::move(Handler)) {} 547 548 // Handle the result (no actual value, just a notification that the function 549 // has completed on the remote end) by calling the user-defined handler with 550 // Error::success(). 551 Error handleResponse(ChannelT &C) override { 552 if (auto Err = C.endReceiveMessage()) 553 return Err; 554 return Handler(Error::success()); 555 } 556 557 // Abandon this response by calling the handler with an 'abandoned response' 558 // error. 559 void abandon() override { 560 if (auto Err = Handler(this->createAbandonedResponseError())) { 561 // Handlers should not fail when passed an abandoned response error. 562 report_fatal_error(std::move(Err)); 563 } 564 } 565 566 private: 567 HandlerT Handler; 568 }; 569 570 // Create a ResponseHandler from a given user handler. 571 template <typename ChannelT, typename FuncRetT, typename HandlerT> 572 std::unique_ptr<ResponseHandler<ChannelT>> createResponseHandler(HandlerT H) { 573 return llvm::make_unique<ResponseHandlerImpl<ChannelT, FuncRetT, HandlerT>>( 574 std::move(H)); 575 } 576 577 // Helper for wrapping member functions up as functors. This is useful for 578 // installing methods as result handlers. 579 template <typename ClassT, typename RetT, typename... ArgTs> 580 class MemberFnWrapper { 581 public: 582 using MethodT = RetT (ClassT::*)(ArgTs...); 583 MemberFnWrapper(ClassT &Instance, MethodT Method) 584 : Instance(Instance), Method(Method) {} 585 RetT operator()(ArgTs &&... Args) { 586 return (Instance.*Method)(std::move(Args)...); 587 } 588 589 private: 590 ClassT &Instance; 591 MethodT Method; 592 }; 593 594 // Helper that provides a Functor for deserializing arguments. 595 template <typename... ArgTs> class ReadArgs { 596 public: 597 Error operator()() { return Error::success(); } 598 }; 599 600 template <typename ArgT, typename... ArgTs> 601 class ReadArgs<ArgT, ArgTs...> : public ReadArgs<ArgTs...> { 602 public: 603 ReadArgs(ArgT &Arg, ArgTs &... Args) 604 : ReadArgs<ArgTs...>(Args...), Arg(Arg) {} 605 606 Error operator()(ArgT &ArgVal, ArgTs &... ArgVals) { 607 this->Arg = std::move(ArgVal); 608 return ReadArgs<ArgTs...>::operator()(ArgVals...); 609 } 610 611 private: 612 ArgT &Arg; 613 }; 614 615 // Manage sequence numbers. 616 template <typename SequenceNumberT> class SequenceNumberManager { 617 public: 618 // Reset, making all sequence numbers available. 619 void reset() { 620 std::lock_guard<std::mutex> Lock(SeqNoLock); 621 NextSequenceNumber = 0; 622 FreeSequenceNumbers.clear(); 623 } 624 625 // Get the next available sequence number. Will re-use numbers that have 626 // been released. 627 SequenceNumberT getSequenceNumber() { 628 std::lock_guard<std::mutex> Lock(SeqNoLock); 629 if (FreeSequenceNumbers.empty()) 630 return NextSequenceNumber++; 631 auto SequenceNumber = FreeSequenceNumbers.back(); 632 FreeSequenceNumbers.pop_back(); 633 return SequenceNumber; 634 } 635 636 // Release a sequence number, making it available for re-use. 637 void releaseSequenceNumber(SequenceNumberT SequenceNumber) { 638 std::lock_guard<std::mutex> Lock(SeqNoLock); 639 FreeSequenceNumbers.push_back(SequenceNumber); 640 } 641 642 private: 643 std::mutex SeqNoLock; 644 SequenceNumberT NextSequenceNumber = 0; 645 std::vector<SequenceNumberT> FreeSequenceNumbers; 646 }; 647 648 // Checks that predicate P holds for each corresponding pair of type arguments 649 // from T1 and T2 tuple. 650 template <template <class, class> class P, typename T1Tuple, typename T2Tuple> 651 class RPCArgTypeCheckHelper; 652 653 template <template <class, class> class P> 654 class RPCArgTypeCheckHelper<P, std::tuple<>, std::tuple<>> { 655 public: 656 static const bool value = true; 657 }; 658 659 template <template <class, class> class P, typename T, typename... Ts, 660 typename U, typename... Us> 661 class RPCArgTypeCheckHelper<P, std::tuple<T, Ts...>, std::tuple<U, Us...>> { 662 public: 663 static const bool value = 664 P<T, U>::value && 665 RPCArgTypeCheckHelper<P, std::tuple<Ts...>, std::tuple<Us...>>::value; 666 }; 667 668 template <template <class, class> class P, typename T1Sig, typename T2Sig> 669 class RPCArgTypeCheck { 670 public: 671 using T1Tuple = typename FunctionArgsTuple<T1Sig>::Type; 672 using T2Tuple = typename FunctionArgsTuple<T2Sig>::Type; 673 674 static_assert(std::tuple_size<T1Tuple>::value >= 675 std::tuple_size<T2Tuple>::value, 676 "Too many arguments to RPC call"); 677 static_assert(std::tuple_size<T1Tuple>::value <= 678 std::tuple_size<T2Tuple>::value, 679 "Too few arguments to RPC call"); 680 681 static const bool value = RPCArgTypeCheckHelper<P, T1Tuple, T2Tuple>::value; 682 }; 683 684 template <typename ChannelT, typename WireT, typename ConcreteT> 685 class CanSerialize { 686 private: 687 using S = SerializationTraits<ChannelT, WireT, ConcreteT>; 688 689 template <typename T> 690 static std::true_type 691 check(typename std::enable_if< 692 std::is_same<decltype(T::serialize(std::declval<ChannelT &>(), 693 std::declval<const ConcreteT &>())), 694 Error>::value, 695 void *>::type); 696 697 template <typename> static std::false_type check(...); 698 699 public: 700 static const bool value = decltype(check<S>(0))::value; 701 }; 702 703 template <typename ChannelT, typename WireT, typename ConcreteT> 704 class CanDeserialize { 705 private: 706 using S = SerializationTraits<ChannelT, WireT, ConcreteT>; 707 708 template <typename T> 709 static std::true_type 710 check(typename std::enable_if< 711 std::is_same<decltype(T::deserialize(std::declval<ChannelT &>(), 712 std::declval<ConcreteT &>())), 713 Error>::value, 714 void *>::type); 715 716 template <typename> static std::false_type check(...); 717 718 public: 719 static const bool value = decltype(check<S>(0))::value; 720 }; 721 722 /// Contains primitive utilities for defining, calling and handling calls to 723 /// remote procedures. ChannelT is a bidirectional stream conforming to the 724 /// RPCChannel interface (see RPCChannel.h), FunctionIdT is a procedure 725 /// identifier type that must be serializable on ChannelT, and SequenceNumberT 726 /// is an integral type that will be used to number in-flight function calls. 727 /// 728 /// These utilities support the construction of very primitive RPC utilities. 729 /// Their intent is to ensure correct serialization and deserialization of 730 /// procedure arguments, and to keep the client and server's view of the API in 731 /// sync. 732 template <typename ImplT, typename ChannelT, typename FunctionIdT, 733 typename SequenceNumberT> 734 class RPCEndpointBase { 735 protected: 736 class OrcRPCInvalid : public Function<OrcRPCInvalid, void()> { 737 public: 738 static const char *getName() { return "__orc_rpc$invalid"; } 739 }; 740 741 class OrcRPCResponse : public Function<OrcRPCResponse, void()> { 742 public: 743 static const char *getName() { return "__orc_rpc$response"; } 744 }; 745 746 class OrcRPCNegotiate 747 : public Function<OrcRPCNegotiate, FunctionIdT(std::string)> { 748 public: 749 static const char *getName() { return "__orc_rpc$negotiate"; } 750 }; 751 752 // Helper predicate for testing for the presence of SerializeTraits 753 // serializers. 754 template <typename WireT, typename ConcreteT> 755 class CanSerializeCheck : detail::CanSerialize<ChannelT, WireT, ConcreteT> { 756 public: 757 using detail::CanSerialize<ChannelT, WireT, ConcreteT>::value; 758 759 static_assert(value, "Missing serializer for argument (Can't serialize the " 760 "first template type argument of CanSerializeCheck " 761 "from the second)"); 762 }; 763 764 // Helper predicate for testing for the presence of SerializeTraits 765 // deserializers. 766 template <typename WireT, typename ConcreteT> 767 class CanDeserializeCheck 768 : detail::CanDeserialize<ChannelT, WireT, ConcreteT> { 769 public: 770 using detail::CanDeserialize<ChannelT, WireT, ConcreteT>::value; 771 772 static_assert(value, "Missing deserializer for argument (Can't deserialize " 773 "the second template type argument of " 774 "CanDeserializeCheck from the first)"); 775 }; 776 777 public: 778 /// Construct an RPC instance on a channel. 779 RPCEndpointBase(ChannelT &C, bool LazyAutoNegotiation) 780 : C(C), LazyAutoNegotiation(LazyAutoNegotiation) { 781 // Hold ResponseId in a special variable, since we expect Response to be 782 // called relatively frequently, and want to avoid the map lookup. 783 ResponseId = FnIdAllocator.getResponseId(); 784 RemoteFunctionIds[OrcRPCResponse::getPrototype()] = ResponseId; 785 786 // Register the negotiate function id and handler. 787 auto NegotiateId = FnIdAllocator.getNegotiateId(); 788 RemoteFunctionIds[OrcRPCNegotiate::getPrototype()] = NegotiateId; 789 Handlers[NegotiateId] = wrapHandler<OrcRPCNegotiate>( 790 [this](const std::string &Name) { return handleNegotiate(Name); }); 791 } 792 793 794 /// Negotiate a function id for Func with the other end of the channel. 795 template <typename Func> Error negotiateFunction(bool Retry = false) { 796 return getRemoteFunctionId<Func>(true, Retry).takeError(); 797 } 798 799 /// Append a call Func, does not call send on the channel. 800 /// The first argument specifies a user-defined handler to be run when the 801 /// function returns. The handler should take an Expected<Func::ReturnType>, 802 /// or an Error (if Func::ReturnType is void). The handler will be called 803 /// with an error if the return value is abandoned due to a channel error. 804 template <typename Func, typename HandlerT, typename... ArgTs> 805 Error appendCallAsync(HandlerT Handler, const ArgTs &... Args) { 806 807 static_assert( 808 detail::RPCArgTypeCheck<CanSerializeCheck, typename Func::Type, 809 void(ArgTs...)>::value, 810 ""); 811 812 // Look up the function ID. 813 FunctionIdT FnId; 814 if (auto FnIdOrErr = getRemoteFunctionId<Func>(LazyAutoNegotiation, false)) 815 FnId = *FnIdOrErr; 816 else { 817 // This isn't a channel error so we don't want to abandon other pending 818 // responses, but we still need to run the user handler with an error to 819 // let them know the call failed. 820 if (auto Err = Handler(errorCodeToError( 821 orcError(OrcErrorCode::UnknownRPCFunction)))) 822 report_fatal_error(std::move(Err)); 823 return FnIdOrErr.takeError(); 824 } 825 826 SequenceNumberT SeqNo; // initialized in locked scope below. 827 { 828 // Lock the pending responses map and sequence number manager. 829 std::lock_guard<std::mutex> Lock(ResponsesMutex); 830 831 // Allocate a sequence number. 832 SeqNo = SequenceNumberMgr.getSequenceNumber(); 833 assert(!PendingResponses.count(SeqNo) && 834 "Sequence number already allocated"); 835 836 // Install the user handler. 837 PendingResponses[SeqNo] = 838 detail::createResponseHandler<ChannelT, typename Func::ReturnType>( 839 std::move(Handler)); 840 } 841 842 // Open the function call message. 843 if (auto Err = C.startSendMessage(FnId, SeqNo)) { 844 abandonPendingResponses(); 845 return Err; 846 } 847 848 // Serialize the call arguments. 849 if (auto Err = detail::HandlerTraits<typename Func::Type>::serializeArgs( 850 C, Args...)) { 851 abandonPendingResponses(); 852 return Err; 853 } 854 855 // Close the function call messagee. 856 if (auto Err = C.endSendMessage()) { 857 abandonPendingResponses(); 858 return Err; 859 } 860 861 return Error::success(); 862 } 863 864 Error sendAppendedCalls() { return C.send(); }; 865 866 template <typename Func, typename HandlerT, typename... ArgTs> 867 Error callAsync(HandlerT Handler, const ArgTs &... Args) { 868 if (auto Err = appendCallAsync<Func>(std::move(Handler), Args...)) 869 return Err; 870 return C.send(); 871 } 872 873 /// Handle one incoming call. 874 Error handleOne() { 875 FunctionIdT FnId; 876 SequenceNumberT SeqNo; 877 if (auto Err = C.startReceiveMessage(FnId, SeqNo)) { 878 abandonPendingResponses(); 879 return Err; 880 } 881 if (FnId == ResponseId) 882 return handleResponse(SeqNo); 883 auto I = Handlers.find(FnId); 884 if (I != Handlers.end()) 885 return I->second(C, SeqNo); 886 887 // else: No handler found. Report error to client? 888 return errorCodeToError(orcError(OrcErrorCode::UnexpectedRPCCall)); 889 } 890 891 /// Helper for handling setter procedures - this method returns a functor that 892 /// sets the variables referred to by Args... to values deserialized from the 893 /// channel. 894 /// E.g. 895 /// 896 /// typedef Function<0, bool, int> Func1; 897 /// 898 /// ... 899 /// bool B; 900 /// int I; 901 /// if (auto Err = expect<Func1>(Channel, readArgs(B, I))) 902 /// /* Handle Args */ ; 903 /// 904 template <typename... ArgTs> 905 static detail::ReadArgs<ArgTs...> readArgs(ArgTs &... Args) { 906 return detail::ReadArgs<ArgTs...>(Args...); 907 } 908 909 /// Abandon all outstanding result handlers. 910 /// 911 /// This will call all currently registered result handlers to receive an 912 /// "abandoned" error as their argument. This is used internally by the RPC 913 /// in error situations, but can also be called directly by clients who are 914 /// disconnecting from the remote and don't or can't expect responses to their 915 /// outstanding calls. (Especially for outstanding blocking calls, calling 916 /// this function may be necessary to avoid dead threads). 917 void abandonPendingResponses() { 918 // Lock the pending responses map and sequence number manager. 919 std::lock_guard<std::mutex> Lock(ResponsesMutex); 920 921 for (auto &KV : PendingResponses) 922 KV.second->abandon(); 923 PendingResponses.clear(); 924 SequenceNumberMgr.reset(); 925 } 926 927 /// Remove the handler for the given function. 928 /// A handler must currently be registered for this function. 929 template <typename Func> 930 void removeHandler() { 931 auto IdItr = LocalFunctionIds.find(Func::getPrototype()); 932 assert(IdItr != LocalFunctionIds.end() && 933 "Function does not have a registered handler"); 934 auto HandlerItr = Handlers.find(IdItr->second); 935 assert(HandlerItr != Handlers.end() && 936 "Function does not have a registered handler"); 937 Handlers.erase(HandlerItr); 938 } 939 940 /// Clear all handlers. 941 void clearHandlers() { 942 Handlers.clear(); 943 } 944 945 protected: 946 947 FunctionIdT getInvalidFunctionId() const { 948 return FnIdAllocator.getInvalidId(); 949 } 950 951 /// Add the given handler to the handler map and make it available for 952 /// autonegotiation and execution. 953 template <typename Func, typename HandlerT> 954 void addHandlerImpl(HandlerT Handler) { 955 956 static_assert(detail::RPCArgTypeCheck< 957 CanDeserializeCheck, typename Func::Type, 958 typename detail::HandlerTraits<HandlerT>::Type>::value, 959 ""); 960 961 FunctionIdT NewFnId = FnIdAllocator.template allocate<Func>(); 962 LocalFunctionIds[Func::getPrototype()] = NewFnId; 963 Handlers[NewFnId] = wrapHandler<Func>(std::move(Handler)); 964 } 965 966 template <typename Func, typename HandlerT> 967 void addAsyncHandlerImpl(HandlerT Handler) { 968 969 static_assert(detail::RPCArgTypeCheck< 970 CanDeserializeCheck, typename Func::Type, 971 typename detail::AsyncHandlerTraits< 972 typename detail::HandlerTraits<HandlerT>::Type 973 >::Type>::value, 974 ""); 975 976 FunctionIdT NewFnId = FnIdAllocator.template allocate<Func>(); 977 LocalFunctionIds[Func::getPrototype()] = NewFnId; 978 Handlers[NewFnId] = wrapAsyncHandler<Func>(std::move(Handler)); 979 } 980 981 Error handleResponse(SequenceNumberT SeqNo) { 982 using Handler = typename decltype(PendingResponses)::mapped_type; 983 Handler PRHandler; 984 985 { 986 // Lock the pending responses map and sequence number manager. 987 std::unique_lock<std::mutex> Lock(ResponsesMutex); 988 auto I = PendingResponses.find(SeqNo); 989 990 if (I != PendingResponses.end()) { 991 PRHandler = std::move(I->second); 992 PendingResponses.erase(I); 993 SequenceNumberMgr.releaseSequenceNumber(SeqNo); 994 } else { 995 // Unlock the pending results map to prevent recursive lock. 996 Lock.unlock(); 997 abandonPendingResponses(); 998 return errorCodeToError(orcError(OrcErrorCode::UnexpectedRPCResponse)); 999 } 1000 } 1001 1002 assert(PRHandler && 1003 "If we didn't find a response handler we should have bailed out"); 1004 1005 if (auto Err = PRHandler->handleResponse(C)) { 1006 abandonPendingResponses(); 1007 return Err; 1008 } 1009 1010 return Error::success(); 1011 } 1012 1013 FunctionIdT handleNegotiate(const std::string &Name) { 1014 auto I = LocalFunctionIds.find(Name); 1015 if (I == LocalFunctionIds.end()) 1016 return getInvalidFunctionId(); 1017 return I->second; 1018 } 1019 1020 // Find the remote FunctionId for the given function. 1021 template <typename Func> 1022 Expected<FunctionIdT> getRemoteFunctionId(bool NegotiateIfNotInMap, 1023 bool NegotiateIfInvalid) { 1024 bool DoNegotiate; 1025 1026 // Check if we already have a function id... 1027 auto I = RemoteFunctionIds.find(Func::getPrototype()); 1028 if (I != RemoteFunctionIds.end()) { 1029 // If it's valid there's nothing left to do. 1030 if (I->second != getInvalidFunctionId()) 1031 return I->second; 1032 DoNegotiate = NegotiateIfInvalid; 1033 } else 1034 DoNegotiate = NegotiateIfNotInMap; 1035 1036 // We don't have a function id for Func yet, but we're allowed to try to 1037 // negotiate one. 1038 if (DoNegotiate) { 1039 auto &Impl = static_cast<ImplT &>(*this); 1040 if (auto RemoteIdOrErr = 1041 Impl.template callB<OrcRPCNegotiate>(Func::getPrototype())) { 1042 RemoteFunctionIds[Func::getPrototype()] = *RemoteIdOrErr; 1043 if (*RemoteIdOrErr == getInvalidFunctionId()) 1044 return make_error<RPCFunctionNotSupported>(Func::getPrototype()); 1045 return *RemoteIdOrErr; 1046 } else 1047 return RemoteIdOrErr.takeError(); 1048 } 1049 1050 // No key was available in the map and we weren't allowed to try to 1051 // negotiate one, so return an unknown function error. 1052 return make_error<RPCFunctionNotSupported>(Func::getPrototype()); 1053 } 1054 1055 using WrappedHandlerFn = std::function<Error(ChannelT &, SequenceNumberT)>; 1056 1057 // Wrap the given user handler in the necessary argument-deserialization code, 1058 // result-serialization code, and call to the launch policy (if present). 1059 template <typename Func, typename HandlerT> 1060 WrappedHandlerFn wrapHandler(HandlerT Handler) { 1061 return [this, Handler](ChannelT &Channel, 1062 SequenceNumberT SeqNo) mutable -> Error { 1063 // Start by deserializing the arguments. 1064 using ArgsTuple = 1065 typename detail::FunctionArgsTuple< 1066 typename detail::HandlerTraits<HandlerT>::Type>::Type; 1067 auto Args = std::make_shared<ArgsTuple>(); 1068 1069 if (auto Err = 1070 detail::HandlerTraits<typename Func::Type>::deserializeArgs( 1071 Channel, *Args)) 1072 return Err; 1073 1074 // GCC 4.7 and 4.8 incorrectly issue a -Wunused-but-set-variable warning 1075 // for RPCArgs. Void cast RPCArgs to work around this for now. 1076 // FIXME: Remove this workaround once we can assume a working GCC version. 1077 (void)Args; 1078 1079 // End receieve message, unlocking the channel for reading. 1080 if (auto Err = Channel.endReceiveMessage()) 1081 return Err; 1082 1083 using HTraits = detail::HandlerTraits<HandlerT>; 1084 using FuncReturn = typename Func::ReturnType; 1085 return detail::respond<FuncReturn>(Channel, ResponseId, SeqNo, 1086 HTraits::unpackAndRun(Handler, *Args)); 1087 }; 1088 } 1089 1090 // Wrap the given user handler in the necessary argument-deserialization code, 1091 // result-serialization code, and call to the launch policy (if present). 1092 template <typename Func, typename HandlerT> 1093 WrappedHandlerFn wrapAsyncHandler(HandlerT Handler) { 1094 return [this, Handler](ChannelT &Channel, 1095 SequenceNumberT SeqNo) mutable -> Error { 1096 // Start by deserializing the arguments. 1097 using AHTraits = detail::AsyncHandlerTraits< 1098 typename detail::HandlerTraits<HandlerT>::Type>; 1099 using ArgsTuple = 1100 typename detail::FunctionArgsTuple<typename AHTraits::Type>::Type; 1101 auto Args = std::make_shared<ArgsTuple>(); 1102 1103 if (auto Err = 1104 detail::HandlerTraits<typename Func::Type>::deserializeArgs( 1105 Channel, *Args)) 1106 return Err; 1107 1108 // GCC 4.7 and 4.8 incorrectly issue a -Wunused-but-set-variable warning 1109 // for RPCArgs. Void cast RPCArgs to work around this for now. 1110 // FIXME: Remove this workaround once we can assume a working GCC version. 1111 (void)Args; 1112 1113 // End receieve message, unlocking the channel for reading. 1114 if (auto Err = Channel.endReceiveMessage()) 1115 return Err; 1116 1117 using HTraits = detail::HandlerTraits<HandlerT>; 1118 using FuncReturn = typename Func::ReturnType; 1119 auto Responder = 1120 [this, SeqNo](typename AHTraits::ResultType RetVal) -> Error { 1121 return detail::respond<FuncReturn>(C, ResponseId, SeqNo, 1122 std::move(RetVal)); 1123 }; 1124 1125 return HTraits::unpackAndRunAsync(Handler, Responder, *Args); 1126 }; 1127 } 1128 1129 ChannelT &C; 1130 1131 bool LazyAutoNegotiation; 1132 1133 RPCFunctionIdAllocator<FunctionIdT> FnIdAllocator; 1134 1135 FunctionIdT ResponseId; 1136 std::map<std::string, FunctionIdT> LocalFunctionIds; 1137 std::map<const char *, FunctionIdT> RemoteFunctionIds; 1138 1139 std::map<FunctionIdT, WrappedHandlerFn> Handlers; 1140 1141 std::mutex ResponsesMutex; 1142 detail::SequenceNumberManager<SequenceNumberT> SequenceNumberMgr; 1143 std::map<SequenceNumberT, std::unique_ptr<detail::ResponseHandler<ChannelT>>> 1144 PendingResponses; 1145 }; 1146 1147 } // end namespace detail 1148 1149 template <typename ChannelT, typename FunctionIdT = uint32_t, 1150 typename SequenceNumberT = uint32_t> 1151 class MultiThreadedRPCEndpoint 1152 : public detail::RPCEndpointBase< 1153 MultiThreadedRPCEndpoint<ChannelT, FunctionIdT, SequenceNumberT>, 1154 ChannelT, FunctionIdT, SequenceNumberT> { 1155 private: 1156 using BaseClass = 1157 detail::RPCEndpointBase< 1158 MultiThreadedRPCEndpoint<ChannelT, FunctionIdT, SequenceNumberT>, 1159 ChannelT, FunctionIdT, SequenceNumberT>; 1160 1161 public: 1162 MultiThreadedRPCEndpoint(ChannelT &C, bool LazyAutoNegotiation) 1163 : BaseClass(C, LazyAutoNegotiation) {} 1164 1165 /// Add a handler for the given RPC function. 1166 /// This installs the given handler functor for the given RPC Function, and 1167 /// makes the RPC function available for negotiation/calling from the remote. 1168 template <typename Func, typename HandlerT> 1169 void addHandler(HandlerT Handler) { 1170 return this->template addHandlerImpl<Func>(std::move(Handler)); 1171 } 1172 1173 /// Add a class-method as a handler. 1174 template <typename Func, typename ClassT, typename RetT, typename... ArgTs> 1175 void addHandler(ClassT &Object, RetT (ClassT::*Method)(ArgTs...)) { 1176 addHandler<Func>( 1177 detail::MemberFnWrapper<ClassT, RetT, ArgTs...>(Object, Method)); 1178 } 1179 1180 template <typename Func, typename HandlerT> 1181 void addAsyncHandler(HandlerT Handler) { 1182 return this->template addAsyncHandlerImpl<Func>(std::move(Handler)); 1183 } 1184 1185 /// Add a class-method as a handler. 1186 template <typename Func, typename ClassT, typename RetT, typename... ArgTs> 1187 void addAsyncHandler(ClassT &Object, RetT (ClassT::*Method)(ArgTs...)) { 1188 addAsyncHandler<Func>( 1189 detail::MemberFnWrapper<ClassT, RetT, ArgTs...>(Object, Method)); 1190 } 1191 1192 /// Return type for non-blocking call primitives. 1193 template <typename Func> 1194 using NonBlockingCallResult = typename detail::ResultTraits< 1195 typename Func::ReturnType>::ReturnFutureType; 1196 1197 /// Call Func on Channel C. Does not block, does not call send. Returns a pair 1198 /// of a future result and the sequence number assigned to the result. 1199 /// 1200 /// This utility function is primarily used for single-threaded mode support, 1201 /// where the sequence number can be used to wait for the corresponding 1202 /// result. In multi-threaded mode the appendCallNB method, which does not 1203 /// return the sequence numeber, should be preferred. 1204 template <typename Func, typename... ArgTs> 1205 Expected<NonBlockingCallResult<Func>> appendCallNB(const ArgTs &... Args) { 1206 using RTraits = detail::ResultTraits<typename Func::ReturnType>; 1207 using ErrorReturn = typename RTraits::ErrorReturnType; 1208 using ErrorReturnPromise = typename RTraits::ReturnPromiseType; 1209 1210 // FIXME: Stack allocate and move this into the handler once LLVM builds 1211 // with C++14. 1212 auto Promise = std::make_shared<ErrorReturnPromise>(); 1213 auto FutureResult = Promise->get_future(); 1214 1215 if (auto Err = this->template appendCallAsync<Func>( 1216 [Promise](ErrorReturn RetOrErr) { 1217 Promise->set_value(std::move(RetOrErr)); 1218 return Error::success(); 1219 }, 1220 Args...)) { 1221 RTraits::consumeAbandoned(FutureResult.get()); 1222 return std::move(Err); 1223 } 1224 return std::move(FutureResult); 1225 } 1226 1227 /// The same as appendCallNBWithSeq, except that it calls C.send() to 1228 /// flush the channel after serializing the call. 1229 template <typename Func, typename... ArgTs> 1230 Expected<NonBlockingCallResult<Func>> callNB(const ArgTs &... Args) { 1231 auto Result = appendCallNB<Func>(Args...); 1232 if (!Result) 1233 return Result; 1234 if (auto Err = this->C.send()) { 1235 this->abandonPendingResponses(); 1236 detail::ResultTraits<typename Func::ReturnType>::consumeAbandoned( 1237 std::move(Result->get())); 1238 return std::move(Err); 1239 } 1240 return Result; 1241 } 1242 1243 /// Call Func on Channel C. Blocks waiting for a result. Returns an Error 1244 /// for void functions or an Expected<T> for functions returning a T. 1245 /// 1246 /// This function is for use in threaded code where another thread is 1247 /// handling responses and incoming calls. 1248 template <typename Func, typename... ArgTs, 1249 typename AltRetT = typename Func::ReturnType> 1250 typename detail::ResultTraits<AltRetT>::ErrorReturnType 1251 callB(const ArgTs &... Args) { 1252 if (auto FutureResOrErr = callNB<Func>(Args...)) 1253 return FutureResOrErr->get(); 1254 else 1255 return FutureResOrErr.takeError(); 1256 } 1257 1258 /// Handle incoming RPC calls. 1259 Error handlerLoop() { 1260 while (true) 1261 if (auto Err = this->handleOne()) 1262 return Err; 1263 return Error::success(); 1264 } 1265 }; 1266 1267 template <typename ChannelT, typename FunctionIdT = uint32_t, 1268 typename SequenceNumberT = uint32_t> 1269 class SingleThreadedRPCEndpoint 1270 : public detail::RPCEndpointBase< 1271 SingleThreadedRPCEndpoint<ChannelT, FunctionIdT, SequenceNumberT>, 1272 ChannelT, FunctionIdT, SequenceNumberT> { 1273 private: 1274 using BaseClass = 1275 detail::RPCEndpointBase< 1276 SingleThreadedRPCEndpoint<ChannelT, FunctionIdT, SequenceNumberT>, 1277 ChannelT, FunctionIdT, SequenceNumberT>; 1278 1279 public: 1280 SingleThreadedRPCEndpoint(ChannelT &C, bool LazyAutoNegotiation) 1281 : BaseClass(C, LazyAutoNegotiation) {} 1282 1283 template <typename Func, typename HandlerT> 1284 void addHandler(HandlerT Handler) { 1285 return this->template addHandlerImpl<Func>(std::move(Handler)); 1286 } 1287 1288 template <typename Func, typename ClassT, typename RetT, typename... ArgTs> 1289 void addHandler(ClassT &Object, RetT (ClassT::*Method)(ArgTs...)) { 1290 addHandler<Func>( 1291 detail::MemberFnWrapper<ClassT, RetT, ArgTs...>(Object, Method)); 1292 } 1293 1294 template <typename Func, typename HandlerT> 1295 void addAsyncHandler(HandlerT Handler) { 1296 return this->template addAsyncHandlerImpl<Func>(std::move(Handler)); 1297 } 1298 1299 /// Add a class-method as a handler. 1300 template <typename Func, typename ClassT, typename RetT, typename... ArgTs> 1301 void addAsyncHandler(ClassT &Object, RetT (ClassT::*Method)(ArgTs...)) { 1302 addAsyncHandler<Func>( 1303 detail::MemberFnWrapper<ClassT, RetT, ArgTs...>(Object, Method)); 1304 } 1305 1306 template <typename Func, typename... ArgTs, 1307 typename AltRetT = typename Func::ReturnType> 1308 typename detail::ResultTraits<AltRetT>::ErrorReturnType 1309 callB(const ArgTs &... Args) { 1310 bool ReceivedResponse = false; 1311 using ResultType = typename detail::ResultTraits<AltRetT>::ErrorReturnType; 1312 auto Result = detail::ResultTraits<AltRetT>::createBlankErrorReturnValue(); 1313 1314 // We have to 'Check' result (which we know is in a success state at this 1315 // point) so that it can be overwritten in the async handler. 1316 (void)!!Result; 1317 1318 if (auto Err = this->template appendCallAsync<Func>( 1319 [&](ResultType R) { 1320 Result = std::move(R); 1321 ReceivedResponse = true; 1322 return Error::success(); 1323 }, 1324 Args...)) { 1325 detail::ResultTraits<typename Func::ReturnType>::consumeAbandoned( 1326 std::move(Result)); 1327 return std::move(Err); 1328 } 1329 1330 while (!ReceivedResponse) { 1331 if (auto Err = this->handleOne()) { 1332 detail::ResultTraits<typename Func::ReturnType>::consumeAbandoned( 1333 std::move(Result)); 1334 return std::move(Err); 1335 } 1336 } 1337 1338 return Result; 1339 } 1340 }; 1341 1342 /// Asynchronous dispatch for a function on an RPC endpoint. 1343 template <typename RPCClass, typename Func> 1344 class RPCAsyncDispatch { 1345 public: 1346 RPCAsyncDispatch(RPCClass &Endpoint) : Endpoint(Endpoint) {} 1347 1348 template <typename HandlerT, typename... ArgTs> 1349 Error operator()(HandlerT Handler, const ArgTs &... Args) const { 1350 return Endpoint.template appendCallAsync<Func>(std::move(Handler), Args...); 1351 } 1352 1353 private: 1354 RPCClass &Endpoint; 1355 }; 1356 1357 /// Construct an asynchronous dispatcher from an RPC endpoint and a Func. 1358 template <typename Func, typename RPCEndpointT> 1359 RPCAsyncDispatch<RPCEndpointT, Func> rpcAsyncDispatch(RPCEndpointT &Endpoint) { 1360 return RPCAsyncDispatch<RPCEndpointT, Func>(Endpoint); 1361 } 1362 1363 /// \brief Allows a set of asynchrounous calls to be dispatched, and then 1364 /// waited on as a group. 1365 class ParallelCallGroup { 1366 public: 1367 1368 ParallelCallGroup() = default; 1369 ParallelCallGroup(const ParallelCallGroup &) = delete; 1370 ParallelCallGroup &operator=(const ParallelCallGroup &) = delete; 1371 1372 /// \brief Make as asynchronous call. 1373 template <typename AsyncDispatcher, typename HandlerT, typename... ArgTs> 1374 Error call(const AsyncDispatcher &AsyncDispatch, HandlerT Handler, 1375 const ArgTs &... Args) { 1376 // Increment the count of outstanding calls. This has to happen before 1377 // we invoke the call, as the handler may (depending on scheduling) 1378 // be run immediately on another thread, and we don't want the decrement 1379 // in the wrapped handler below to run before the increment. 1380 { 1381 std::unique_lock<std::mutex> Lock(M); 1382 ++NumOutstandingCalls; 1383 } 1384 1385 // Wrap the user handler in a lambda that will decrement the 1386 // outstanding calls count, then poke the condition variable. 1387 using ArgType = typename detail::ResponseHandlerArg< 1388 typename detail::HandlerTraits<HandlerT>::Type>::ArgType; 1389 // FIXME: Move handler into wrapped handler once we have C++14. 1390 auto WrappedHandler = [this, Handler](ArgType Arg) { 1391 auto Err = Handler(std::move(Arg)); 1392 std::unique_lock<std::mutex> Lock(M); 1393 --NumOutstandingCalls; 1394 CV.notify_all(); 1395 return Err; 1396 }; 1397 1398 return AsyncDispatch(std::move(WrappedHandler), Args...); 1399 } 1400 1401 /// \brief Blocks until all calls have been completed and their return value 1402 /// handlers run. 1403 void wait() { 1404 std::unique_lock<std::mutex> Lock(M); 1405 while (NumOutstandingCalls > 0) 1406 CV.wait(Lock); 1407 } 1408 1409 private: 1410 std::mutex M; 1411 std::condition_variable CV; 1412 uint32_t NumOutstandingCalls = 0; 1413 }; 1414 1415 /// @brief Convenience class for grouping RPC Functions into APIs that can be 1416 /// negotiated as a block. 1417 /// 1418 template <typename... Funcs> 1419 class APICalls { 1420 public: 1421 1422 /// @brief Test whether this API contains Function F. 1423 template <typename F> 1424 class Contains { 1425 public: 1426 static const bool value = false; 1427 }; 1428 1429 /// @brief Negotiate all functions in this API. 1430 template <typename RPCEndpoint> 1431 static Error negotiate(RPCEndpoint &R) { 1432 return Error::success(); 1433 } 1434 }; 1435 1436 template <typename Func, typename... Funcs> 1437 class APICalls<Func, Funcs...> { 1438 public: 1439 1440 template <typename F> 1441 class Contains { 1442 public: 1443 static const bool value = std::is_same<F, Func>::value | 1444 APICalls<Funcs...>::template Contains<F>::value; 1445 }; 1446 1447 template <typename RPCEndpoint> 1448 static Error negotiate(RPCEndpoint &R) { 1449 if (auto Err = R.template negotiateFunction<Func>()) 1450 return Err; 1451 return APICalls<Funcs...>::negotiate(R); 1452 } 1453 1454 }; 1455 1456 template <typename... InnerFuncs, typename... Funcs> 1457 class APICalls<APICalls<InnerFuncs...>, Funcs...> { 1458 public: 1459 1460 template <typename F> 1461 class Contains { 1462 public: 1463 static const bool value = 1464 APICalls<InnerFuncs...>::template Contains<F>::value | 1465 APICalls<Funcs...>::template Contains<F>::value; 1466 }; 1467 1468 template <typename RPCEndpoint> 1469 static Error negotiate(RPCEndpoint &R) { 1470 if (auto Err = APICalls<InnerFuncs...>::negotiate(R)) 1471 return Err; 1472 return APICalls<Funcs...>::negotiate(R); 1473 } 1474 1475 }; 1476 1477 } // end namespace rpc 1478 } // end namespace orc 1479 } // end namespace llvm 1480 1481 #endif 1482