当前位置:网站首页>基于Ocelot的gRpc网关

基于Ocelot的gRpc网关

2022-04-23 13:59:00 流苏1990

原文&思路参见(本例代码调整较多,也做了比较多的改进):基于Ocelot的gRpcHttp网关_dotNET跨平台的博客-CSDN博客

网关架设后,请求即为如下:

640?wx_fmt=png

思路解析:

1、定时监控某个存放.proto的文件夹。(参见:DirectoryMonitorBackgroundService)

2、当文件有变动时 调用protoc工具生成C#代码。(本例增加一次性编译多个文件,但未解决引用其他proto的问题&相同类名的问题)

3、生成代码后调用CSharpCompilation实例来生成对应的DLL。

4、反编译DLL后取得MethodDescriptor对象,并缓存起来。

5、重新注入Ocelot中的IHttpRequester接口。该接口作用是  根据获取得到的DownstreamRequest采用HTTP下发下游主机请求得到数据后 在转发给请求方。

6、IHttpRequester在处理请求时判断是否包含grpc的请求值,如果是则解析出服务名与方法名,并匹配MethodDescriptor对象缓存。

7、自行实现ClientBase<T>,并在创建Channel后创建Client实例。然后调用gRP服务

8、取得数据后,转发给请求方

以下列出部分具体的代码。

//作用:根据取得的变动文件,生成c#代码,而后生成DLL,然后反编译获取MethodDescriptor
[Serializable]
    public class GrpcCodeGeneraterSubscriber : IEventSubscriber
    {
        private readonly ILogger<GrpcCodeGeneraterSubscriber> logger = null;
        private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
        private readonly IGrpcServiceDescriptor serviceDescriptor = null;
        private readonly IProtoGenerateCode protoGenerateCode = null;

        public GrpcCodeGeneraterSubscriber(ILogger<GrpcCodeGeneraterSubscriber> logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
        {
            this.logger = logger;
            this.serviceDescriptor = serviceDescriptor;
            this.protoGenerateCode = protoGenerateCode;
        }

        [EventSubscribe("GrpcCodeGenerater")]
        public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
        {
            var protefileList = context.Source.Payload as string[];
            if (protefileList == null || protefileList.Length <= 0)
                return;
            this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
            foreach (var protofilepath in protefileList)
            {
                var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
                if (GenerateDllAsync(protofilenamewithoutExtension) == false)
                    return;
                var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
                File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
                await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
            }
            this.logger.LogInformation($" generater dll compeleted:{string.Join<string>(",", protefileList.Select(y => Path.GetFileName(y)))} ");
            //删除文件
            foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
                File.Delete(var);
        }

        private bool GenerateDllAsync(string assemblyName)
        {
            var dirpath = Path.Combine(BaseDirectory, "plugins");
            var dllFiles = Directory.GetFiles(dirpath, "*.cs");
            if (dllFiles.Length == 0)
                return false;
            List<SyntaxTree> trees = new List<SyntaxTree>();
            foreach (var file in dllFiles)
            {
                var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
                if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
                    continue;
                var csStr = File.ReadAllText(file);
                trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
            }
            var references2 = new[]{
                MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
                MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
                MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
                MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
                MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
                MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
            };
            var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
            var dlldir = Path.Combine(dirpath, $".{assemblyName}");
            if (Directory.Exists(dlldir) == false)
                Directory.CreateDirectory(dlldir);
            var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
            this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
            return result2.Success;
        }


    }
//作用:调用protoc工具生成C#代码,本例生成xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
    {
        private readonly ILogger<ProtoGenerateCode> Logger = null;
        private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;

        public ProtoGenerateCode(ILogger<ProtoGenerateCode> logger)
        {
            this.Logger = logger;
        }

        public void GenerateCsharpFromProto(params string[] protoPath)
        {
            var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();// 系统架构,x86 x64
            var bin = string.Empty;
            var os = string.Empty;
            if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
            {
                os = "windows";
                bin = ".exe";
            }
            else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
                os = "linux";
            else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
                os = "macosx";
            else
            {
                Logger.LogError("该平台不支持grpctools.");
                return;
            }

            var args = new Dictionary<string, List<string>>();
            var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
            var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
            string outdir = Path.Combine(BaseDirectory, "plugins");
            if (Directory.Exists(outdir) == false)
                Directory.CreateDirectory(outdir);

            args.Add("proto_path", new List<string>(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
            args.Add("csharp_out", new List<string>(new string[] { outdir }));
            if (!string.IsNullOrEmpty(grpcPath))
            {
                args.Add("plugin", new List<string>(new string[] { "protoc-gen-grpc=" + grpcPath }));
                args.Add("grpc_out", new List<string>(new string[] { outdir }));
            }
            var argsValue = WriteArgs(string.Join<string>(" ", protoPath), args);//批量.\st\supplier.proto .\st\customer.proto
            Logger.LogInformation("Running: " + protocPath + " " + argsValue);
            var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
            if (!string.IsNullOrEmpty(stderr))
                throw new InvalidOperationException(stderr);
        }

        private string WriteArgs(string protoFile, Dictionary<string, List<string>> args)
        {
            var sb = new StringBuilder();
            foreach (var kvp in args)
            {
                foreach (var argInstance in kvp.Value)
                    sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
            }
            sb.AppendFormat(" {0}", protoFile);
            return sb.ToString();
        }

        static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
        {
            using (var proc = new Process())
            {
                var psi = proc.StartInfo;
                psi.FileName = path;
                psi.Arguments = arguments;
                if (!string.IsNullOrEmpty(workingDir))
                    psi.WorkingDirectory = workingDir;
                psi.RedirectStandardError = psi.RedirectStandardOutput = true;
                psi.UseShellExecute = false;
                psi.CreateNoWindow = true;
                proc.Start();
                var stdoutTask = proc.StandardOutput.ReadToEndAsync();
                var stderrTask = proc.StandardError.ReadToEndAsync();
                if (!proc.WaitForExit(5000))
                {
                    try { proc.Kill(); } catch { }
                }
                var exitCode = proc.ExitCode;
                stderr = stdout = "";
                if (stdoutTask.Wait(1000))
                    stdout = stdoutTask.Result;
                if (stderrTask.Wait(1000))
                    stderr = stderrTask.Result;
                return exitCode;
            }
        }
    }
//重新该接口以实现RPC的转发,
//indexof('grpc')的实现,是因为有可能采用https的请求
//端口号加1000的作用是下游服务也支持http,所以grpc请求的端口就是http的端口在加上1000
[Serializable]
    public class GrpcHttpRequester : IHttpRequester
    {
        private readonly IHttpClientCache _cacheHandlers;
        private readonly IOcelotLogger _logger;
        private readonly IDelegatingHandlerHandlerFactory _factory;
        private readonly IExceptionToErrorMapper _mapper;


        public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
            IHttpClientCache cacheHandlers,
            IDelegatingHandlerHandlerFactory factory,
            IExceptionToErrorMapper mapper)
        {
            this._logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
            this._cacheHandlers = cacheHandlers;
            this._factory = factory;
            this._mapper = mapper;
        }

        public async Task<Response<HttpResponseMessage>> GetResponse(HttpContext httpContext)
        {
            var downstreamRequest = httpContext.Items.DownstreamRequest();
            if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
                return await ProcessHttpResponse(httpContext);
            return await ProcessGrpcResponse(httpContext);
        }

        private async Task<Response<HttpResponseMessage>> ProcessGrpcResponse(HttpContext httpContext)
        {
            try
            {
                GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
                if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
                    return await this.ProcessHttpResponse(httpContext);
                //throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");

                var downStreamRqeust = httpContext.Items.DownstreamRequest();
                var options = new List<ChannelOption> { new ChannelOption("keepalive_time_ms", 60000) };
                if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
                    options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
                //这个是长链接的,不加连接池是否会有问题??(参照:https://github.com/leenux/GrpcPool)-->数据量比较大的情况下怎么处理
                Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
                var client = new MethodDescriptorClient(channel);
                var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
                return new OkResponse<HttpResponseMessage>(httpResponseMessage);
            }
            catch (RpcException exception)
            {
                var error = _mapper.Map(exception);
                return new OKButFailResponse<HttpResponseMessage>(error);
            }
            catch (Exception exception)
            {
                var error = _mapper.Map(exception);
                return new ErrorResponse<HttpResponseMessage>(error);
            }
        }

        private async Task<Response<HttpResponseMessage>> ProcessHttpResponse(HttpContext httpContext)
        {
            var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
            var downstreamRoute = httpContext.Items.DownstreamRoute();
            var downstreamRequest = httpContext.Items.DownstreamRequest();
            var httpClient = builder.Create(downstreamRoute);
            try
            {
                var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
                return new OkResponse<HttpResponseMessage>(response);
            }
            catch (Exception exception)
            {
                var error = _mapper.Map(exception);
                return new ErrorResponse<HttpResponseMessage>(error);
            }
            finally
            {
                builder.Save();
            }
        }
    }
 //作用:实现GRPC的请求,具体看InvokeAsync方法
public class MethodDescriptorClient : ClientBase<MethodDescriptorClient>
    {
        public MethodDescriptorClient(Channel channel)
            : base(channel)
        {
        }

        public MethodDescriptorClient(CallInvoker callInvoker)
            : base(callInvoker)
        {
        }

        public MethodDescriptorClient()
            : base()
        {
        }

        protected MethodDescriptorClient(ClientBaseConfiguration configuration)
            : base(configuration)
        {
        }

        protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration)
        {
            return new MethodDescriptorClient(configuration);
        }

        /// <summary>
        /// InvokeAsync
        /// </summary>
        public Task<HttpResponseMessage> InvokeAsync(GrpcRequestMessage grpcRequestMessage)
        {
            var methodDescriptor = grpcRequestMessage.GrpcRequestMethod;
            System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
            return (Task<HttpResponseMessage>)m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage });
        }

        private async Task<HttpResponseMessage> CallGrpcAsyncCore<TRequest, TResponse>(GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage<TRequest> where TResponse : class, IMessage<TResponse>
        {
            CallOptions option = CreateCallOptions(grpcRequestMessage.Headers);
            var rpc = GrpcMethodBuilder<TRequest, TResponse>.GetMethod(grpcRequestMessage.GrpcRequestMethod);

            var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync();
            TRequest request = JsonConvert.DeserializeObject<TRequest>(requestMessage);
            List<TRequest> requests = new List<TRequest>() { request };

            switch (rpc.Type)
            {
                case MethodType.Unary:
                    var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault());
                    return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1);
                case MethodType.ClientStreaming:
                    var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests);
                    return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1);
                case MethodType.ServerStreaming:
                    var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault());
                    return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1);
                case MethodType.DuplexStreaming:
                    var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests);
                    return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1);
                default:
                    throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported.");
            }
        }

        private Task<HttpResponseMessage> ProcessHttpResponseMessage<TResponse>(Metadata headers, params TResponse[] responses)
        {
            HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK);
            httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses));
            foreach (var entry in headers)
                httpResponseMessage.Headers.Add(entry.Key, entry.Value);
            return Task.FromResult(httpResponseMessage);
        }

        private CallOptions CreateCallOptions(HttpRequestHeaders headers)
        {
            Metadata meta = new Metadata();
            foreach (var entry in headers)
                meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault());
            CallOptions option = new CallOptions(meta);
            return option;
        }

        /// <summary>
        /// 
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="request"></param>
        /// <returns></returns>
        private async Task<Tuple<TResponse, Metadata>> AsyncUnaryCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
        {
            using (AsyncUnaryCall<TResponse> call = invoker.AsyncUnaryCall(method, null, option, request))
            {
                return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="requests"></param>
        /// <returns></returns>
        private async Task<Tuple<TResponse, Metadata>> AsyncClientStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
        {
            using (AsyncClientStreamingCall<TRequest, TResponse> call = invoker.AsyncClientStreamingCall(method, null, option))
            {
                if (requests != null)
                {
                    foreach (TRequest request in requests)
                        await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
                }
                await call.RequestStream.CompleteAsync().ConfigureAwait(false);
                return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="request"></param>
        /// <returns></returns>
        private async Task<Tuple<IList<TResponse>, Metadata>> AsyncServerStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
        {
            using (AsyncServerStreamingCall<TResponse> call = invoker.AsyncServerStreamingCall(method, null, option, request))
            {
                IList<TResponse> responses = new List<TResponse>();
                while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
                    responses.Add(call.ResponseStream.Current);
                return Tuple.Create(responses, await call.ResponseHeadersAsync);
            }
        }

        /// <summary>
        ///
        /// </summary>
        /// <typeparam name="TRequest"></typeparam>
        /// <typeparam name="TResponse"></typeparam>
        /// <param name="invoker"></param>
        /// <param name="method"></param>
        /// <param name="option"></param>
        /// <param name="requests"></param>
        /// <returns></returns>
        private async Task<Tuple<IList<TResponse>, Metadata>> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
        {
            using (AsyncDuplexStreamingCall<TRequest, TResponse> call = invoker.AsyncDuplexStreamingCall(method, null, option))
            {
                if (requests != null)
                {
                    foreach (TRequest request in requests)
                        await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
                }
                await call.RequestStream.CompleteAsync().ConfigureAwait(false);
                IList<TResponse> responses = new List<TResponse>();
                while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
                    responses.Add(call.ResponseStream.Current);
                return Tuple.Create(responses, await call.ResponseHeadersAsync);
            }
        }
    }

MethodDescriptorClient 类中获取Reuqest的请求参数中,目前只支持StringContent (httpClient POST的方式请求到后端,取得到的都是ByteArrayContent)

所以客户端的调用方式如下:

using (HttpClient httpClient = new HttpClient())
            {
                var queryurl = queryUrl();
                if (queryurl.EndsWith("/") == false)
                    queryurl += "/";

                //var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
                //content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");

                var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
                StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");

                var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
                var responseMessage = postResultTask.GetAwaiter().GetResult();
                this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
            }

网关注册如下

 public void ConfigureServices(IServiceCollection services)
        {
            services.AddOcelot().AddOcelotGrpc();
        }

网关服务需要放置protoc程序。第二个目录用来放置proto文件

 

 Git地址:Sam/Ocelot.Provider.RPC

版权声明
本文为[流苏1990]所创,转载请带上原文链接,感谢
https://blog.csdn.net/fuweiping/article/details/124325025