当前位置:网站首页>Grpc gateway based on Ocelot

Grpc gateway based on Ocelot

2022-04-23 16:54:00 Tassel 1990

original text & For ideas, see ( There are many code adjustments in this example , Also made a lot of improvements ): be based on Ocelot Of gRpcHttp gateway _dotNET Cross platform blog -CSDN Blog

After gateway erection , The request is as follows :

640?wx_fmt=png

Thinking analysis :

1、 Regularly monitor a storage .proto Folder .( See :DirectoryMonitorBackgroundService)

2、 When the file changes call protoc Tool generation C# Code .( This example adds multiple files compiled at one time , However, other references are not resolved proto The problem of & Problems with the same class name )

3、 Call after generating code base note CSharpCompilation Instance to generate the corresponding DLL.

4、 Decompile DLL Post acquisition MethodDescriptor object , And cache it .

5、 Refill Ocelot Medium IHttpRequester Interface . The function of this interface is   According to the obtained DownstreamRequest use HTTP Send the downstream host request to get the data Before forwarding to the requester .

6、IHttpRequester When processing the request, judge whether it contains grpc Request value for , If yes, resolve the service name and method name , And match MethodDescriptor Object caching .

7、 To achieve ClientBase<T>, And creating Channel Create after Client example . And then call gRP service

8、 After obtaining the data , Forward to requester

Some specific codes are listed below .

// effect : According to the obtained change documents , Generate c# Code , Then generate DLL, Then decompile to get MethodDescriptor
[Serializable]
    public class GrpcCodeGeneraterSubscriber : IEventSubscriber
    {
        private readonly ILogger<GrpcCodeGeneraterSubscriber> logger = null;
        private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
        private readonly IGrpcServiceDescriptor serviceDescriptor = null;
        private readonly IProtoGenerateCode protoGenerateCode = null;

        public GrpcCodeGeneraterSubscriber(ILogger<GrpcCodeGeneraterSubscriber> logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
        {
            this.logger = logger;
            this.serviceDescriptor = serviceDescriptor;
            this.protoGenerateCode = protoGenerateCode;
        }

        [EventSubscribe("GrpcCodeGenerater")]
        public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
        {
            var protefileList = context.Source.Payload as string[];
            if (protefileList == null || protefileList.Length <= 0)
                return;
            this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
            foreach (var protofilepath in protefileList)
            {
                var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
                if (GenerateDllAsync(protofilenamewithoutExtension) == false)
                    return;
                var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
                File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
                await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
            }
            this.logger.LogInformation($" generater dll compeleted:{string.Join<string>(",", protefileList.Select(y => Path.GetFileName(y)))} ");
            // Delete file 
            foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
                File.Delete(var);
        }

        private bool GenerateDllAsync(string assemblyName)
        {
            var dirpath = Path.Combine(BaseDirectory, "plugins");
            var dllFiles = Directory.GetFiles(dirpath, "*.cs");
            if (dllFiles.Length == 0)
                return false;
            List<SyntaxTree> trees = new List<SyntaxTree>();
            foreach (var file in dllFiles)
            {
                var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
                if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
                    continue;
                var csStr = File.ReadAllText(file);
                trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
            }
            var references2 = new[]{
                MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
                MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
                MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
                MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
                MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
            };
            var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
            var dlldir = Path.Combine(dirpath, $".{assemblyName}");
            if (Directory.Exists(dlldir) == false)
                Directory.CreateDirectory(dlldir);
            var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
            this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
            return result2.Success;
        }


    }
// effect : call protoc Tool generation C# Code , This example generates xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
    {
        private readonly ILogger<ProtoGenerateCode> Logger = null;
        private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;

        public ProtoGenerateCode(ILogger<ProtoGenerateCode> logger)
        {
            this.Logger = logger;
        }

        public void GenerateCsharpFromProto(params string[] protoPath)
        {
            var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();//  System architecture ,x86 x64
            var bin = string.Empty;
            var os = string.Empty;
            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
            {
                os = "windows";
                bin = ".exe";
            }
            else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
                os = "linux";
            else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
                os = "macosx";
            else
            {
                Logger.LogError(" This platform does not support grpctools.");
                return;
            }

            var args = new Dictionary<string, List<string>>();
            var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
            var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
            string outdir = Path.Combine(BaseDirectory, "plugins");
            if (Directory.Exists(outdir) == false)
                Directory.CreateDirectory(outdir);

            args.Add("proto_path", new List<string>(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
            args.Add("csharp_out", new List<string>(new string[] { outdir }));
            if (!string.IsNullOrEmpty(grpcPath))
            {
                args.Add("plugin", new List<string>(new string[] { "protoc-gen-grpc=" + grpcPath }));
                args.Add("grpc_out", new List<string>(new string[] { outdir }));
            }
            var argsValue = WriteArgs(string.Join<string>(" ", protoPath), args);// Batch .\st\supplier.proto .\st\customer.proto
            Logger.LogInformation("Running: " + protocPath + " " + argsValue);
            var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
            if (!string.IsNullOrEmpty(stderr))
                throw new InvalidOperationException(stderr);
        }

        private string WriteArgs(string protoFile, Dictionary<string, List<string>> args)
        {
            var sb = new StringBuilder();
            foreach (var kvp in args)
            {
                foreach (var argInstance in kvp.Value)
                    sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
            }
            sb.AppendFormat(" {0}", protoFile);
            return sb.ToString();
        }

        static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
        {
            using (var proc = new Process())
            {
                var psi = proc.StartInfo;
                psi.FileName = path;
                psi.Arguments = arguments;
                if (!string.IsNullOrEmpty(workingDir))
                    psi.WorkingDirectory = workingDir;
                psi.RedirectStandardError = psi.RedirectStandardOutput = true;
                psi.UseShellExecute = false;
                psi.CreateNoWindow = true;
                proc.Start();
                var stdoutTask = proc.StandardOutput.ReadToEndAsync();
                var stderrTask = proc.StandardError.ReadToEndAsync();
                if (!proc.WaitForExit(5000))
                {
                    try { proc.Kill(); } catch { }
                }
                var exitCode = proc.ExitCode;
                stderr = stdout = "";
                if (stdoutTask.Wait(1000))
                    stdout = stdoutTask.Result;
                if (stderrTask.Wait(1000))
                    stderr = stderrTask.Result;
                return exitCode;
            }
        }
    }
// Re the interface to implement RPC Forwarding ,
//indexof('grpc') The implementation of the , Because it is possible to use https Request 
// Port number plus 1000 The role of is that downstream services also support http, therefore grpc The requested port is http The port of is adding 1000
[Serializable]
    public class GrpcHttpRequester : IHttpRequester
    {
        private readonly IHttpClientCache _cacheHandlers;
        private readonly IOcelotLogger _logger;
        private readonly IDelegatingHandlerHandlerFactory _factory;
        private readonly IExceptionToErrorMapper _mapper;


        public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
            IHttpClientCache cacheHandlers,
            IDelegatingHandlerHandlerFactory factory,
            IExceptionToErrorMapper mapper)
        {
            this._logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
            this._cacheHandlers = cacheHandlers;
            this._factory = factory;
            this._mapper = mapper;
        }

        public async Task<Response<HttpResponseMessage>> GetResponse(HttpContext httpContext)
        {
            var downstreamRequest = httpContext.Items.DownstreamRequest();
            if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
                return await ProcessHttpResponse(httpContext);
            return await ProcessGrpcResponse(httpContext);
        }

        private async Task<Response<HttpResponseMessage>> ProcessGrpcResponse(HttpContext httpContext)
        {
            try
            {
                GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
                if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
                    return await this.ProcessHttpResponse(httpContext);
                //throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");

                var downStreamRqeust = httpContext.Items.DownstreamRequest();
                var options = new List<ChannelOption> { new ChannelOption("keepalive_time_ms", 60000) };
                if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
                    options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
                // This is a long link , Whether there will be a problem without connection pool ??( reference :https://github.com/leenux/GrpcPool)--> How to deal with a large amount of data 
                Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
                var client = new MethodDescriptorClient(channel);
                var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
                return new OkResponse<HttpResponseMessage>(httpResponseMessage);
            }
            catch (RpcException exception)
            {
                var error = _mapper.Map(exception);
                return new OKButFailResponse<HttpResponseMessage>(error);
            }
            catch (Exception exception)
            {
                var error = _mapper.Map(exception);
                return new ErrorResponse<HttpResponseMessage>(error);
            }
        }

        private async Task<Response<HttpResponseMessage>> ProcessHttpResponse(HttpContext httpContext)
        {
            var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
            var downstreamRoute = httpContext.Items.DownstreamRoute();
            var downstreamRequest = httpContext.Items.DownstreamRequest();
            var httpClient = builder.Create(downstreamRoute);
            try
            {
                var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
                return new OkResponse<HttpResponseMessage>(response);
            }
            catch (Exception exception)
            {
                var error = _mapper.Map(exception);
                return new ErrorResponse<HttpResponseMessage>(error);
            }
            finally
            {
                builder.Save();
            }
        }
    }
 // effect : Realization GRPC Request , To be specific, see InvokeAsync Method 
public class MethodDescriptorClient : ClientBase<MethodDescriptorClient>
    {
        public MethodDescriptorClient(Channel channel)
            : base(channel)
        {
        }

        public MethodDescriptorClient(CallInvoker callInvoker)
            : base(callInvoker)
        {
        }

        public MethodDescriptorClient()
            : base()
        {
        }

        protected MethodDescriptorClient(ClientBaseConfiguration configuration)
            : base(configuration)
        {
        }

        protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration)
        {
            return new MethodDescriptorClient(configuration);
        }

        /// <summary>
        /// InvokeAsync
        /// </summary>
        public Task<HttpResponseMessage> InvokeAsync(GrpcRequestMessage grpcRequestMessage)
        {
            var methodDescriptor = grpcRequestMessage.GrpcRequestMethod;
            System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
            return (Task<HttpResponseMessage>)m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage });
        }

        private async Task<HttpResponseMessage> CallGrpcAsyncCore<TRequest, TResponse>(GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage<TRequest> where TResponse : class, IMessage<TResponse>
        {
            CallOptions option = CreateCallOptions(grpcRequestMessage.Headers);
            var rpc = GrpcMethodBuilder<TRequest, TResponse>.GetMethod(grpcRequestMessage.GrpcRequestMethod);

            var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync();
            TRequest request = JsonConvert.DeserializeObject<TRequest>(requestMessage);
            List<TRequest> requests = new List<TRequest>() { request };

            switch (rpc.Type)
            {
                case MethodType.Unary:
                    var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault());
                    return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1);
                case MethodType.ClientStreaming:
                    var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests);
                    return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1);
                case MethodType.ServerStreaming:
                    var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault());
                    return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1);
                case MethodType.DuplexStreaming:
                    var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests);
                    return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1);
                default:
                    throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported.");
            }
        }

        private Task<HttpResponseMessage> ProcessHttpResponseMessage<TResponse>(Metadata headers, params TResponse[] responses)
        {
            HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK);
            httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses));
            foreach (var entry in headers)
                httpResponseMessage.Headers.Add(entry.Key, entry.Value);
            return Task.FromResult(httpResponseMessage);
        }

        private CallOptions CreateCallOptions(HttpRequestHeaders headers)
        {
            Metadata meta = new Metadata();
            foreach (var entry in headers)
                meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault());
            CallOptions option = new CallOptions(meta);
            return option;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="request"></param>
        /// <returns></returns>
        private async Task<Tuple<TResponse, Metadata>> AsyncUnaryCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
        {
            using (AsyncUnaryCall<TResponse> call = invoker.AsyncUnaryCall(method, null, option, request))
            {
                return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="requests"></param>
        /// <returns></returns>
        private async Task<Tuple<TResponse, Metadata>> AsyncClientStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
        {
            using (AsyncClientStreamingCall<TRequest, TResponse> call = invoker.AsyncClientStreamingCall(method, null, option))
            {
                if (requests != null)
                {
                    foreach (TRequest request in requests)
                        await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
                }
                await call.RequestStream.CompleteAsync().ConfigureAwait(false);
                return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="request"></param>
        /// <returns></returns>
        private async Task<Tuple<IList<TResponse>, Metadata>> AsyncServerStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
        {
            using (AsyncServerStreamingCall<TResponse> call = invoker.AsyncServerStreamingCall(method, null, option, request))
            {
                IList<TResponse> responses = new List<TResponse>();
                while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
                    responses.Add(call.ResponseStream.Current);
                return Tuple.Create(responses, await call.ResponseHeadersAsync);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="requests"></param>
        /// <returns></returns>
        private async Task<Tuple<IList<TResponse>, Metadata>> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
        {
            using (AsyncDuplexStreamingCall<TRequest, TResponse> call = invoker.AsyncDuplexStreamingCall(method, null, option))
            {
                if (requests != null)
                {
                    foreach (TRequest request in requests)
                        await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
                }
                await call.RequestStream.CompleteAsync().ConfigureAwait(false);
                IList<TResponse> responses = new List<TResponse>();
                while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
                    responses.Add(call.ResponseStream.Current);
                return Tuple.Create(responses, await call.ResponseHeadersAsync);
            }
        }
    }

MethodDescriptorClient Get in class Reuqest In the request parameters of , Currently only supported StringContent (httpClient POST Request to the back end , What we get is ByteArrayContent)

Therefore, the calling method of the client is as follows :

using (HttpClient httpClient = new HttpClient())
            {
                var queryurl = queryUrl();
                if (queryurl.EndsWith("/") == false)
                    queryurl += "/";

                //var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
                //content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");

                var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
                StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");

                var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
                var responseMessage = postResultTask.GetAwaiter().GetResult();
                this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
            }

The gateway is registered as follows

 public void ConfigureServices(IServiceCollection services)
        {
            services.AddOcelot().AddOcelotGrpc();
        }

The gateway service needs to be placed protoc Program . The second directory is for proto file

 

 Git Address :Sam/Ocelot.Provider.RPC

版权声明
本文为[Tassel 1990]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231359253190.html