当前位置:网站首页>基于Ocelot的gRpc网关
基于Ocelot的gRpc网关
2022-04-23 13:59:00 【流苏1990】
原文&思路参见(本例代码调整较多,也做了比较多的改进):基于Ocelot的gRpcHttp网关_dotNET跨平台的博客-CSDN博客
网关架设后,请求即为如下:
思路解析:
1、定时监控某个存放.proto的文件夹。(参见:DirectoryMonitorBackgroundService)
2、当文件有变动时 调用protoc工具生成C#代码。(本例增加一次性编译多个文件,但未解决引用其他proto的问题&相同类名的问题)
3、生成代码后调用CSharpCompilation实例来生成对应的DLL。
4、反编译DLL后取得MethodDescriptor对象,并缓存起来。
5、重新注入Ocelot中的IHttpRequester接口。该接口作用是 根据获取得到的DownstreamRequest采用HTTP下发下游主机请求得到数据后 在转发给请求方。
6、IHttpRequester在处理请求时判断是否包含grpc的请求值,如果是则解析出服务名与方法名,并匹配MethodDescriptor对象缓存。
7、自行实现ClientBase<T>,并在创建Channel后创建Client实例。然后调用gRP服务
8、取得数据后,转发给请求方
以下列出部分具体的代码。
//作用:根据取得的变动文件,生成c#代码,而后生成DLL,然后反编译获取MethodDescriptor
[Serializable]
public class GrpcCodeGeneraterSubscriber : IEventSubscriber
{
private readonly ILogger<GrpcCodeGeneraterSubscriber> logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
private readonly IGrpcServiceDescriptor serviceDescriptor = null;
private readonly IProtoGenerateCode protoGenerateCode = null;
public GrpcCodeGeneraterSubscriber(ILogger<GrpcCodeGeneraterSubscriber> logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
{
this.logger = logger;
this.serviceDescriptor = serviceDescriptor;
this.protoGenerateCode = protoGenerateCode;
}
[EventSubscribe("GrpcCodeGenerater")]
public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
{
var protefileList = context.Source.Payload as string[];
if (protefileList == null || protefileList.Length <= 0)
return;
this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
foreach (var protofilepath in protefileList)
{
var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
if (GenerateDllAsync(protofilenamewithoutExtension) == false)
return;
var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
}
this.logger.LogInformation($" generater dll compeleted:{string.Join<string>(",", protefileList.Select(y => Path.GetFileName(y)))} ");
//删除文件
foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
File.Delete(var);
}
private bool GenerateDllAsync(string assemblyName)
{
var dirpath = Path.Combine(BaseDirectory, "plugins");
var dllFiles = Directory.GetFiles(dirpath, "*.cs");
if (dllFiles.Length == 0)
return false;
List<SyntaxTree> trees = new List<SyntaxTree>();
foreach (var file in dllFiles)
{
var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
continue;
var csStr = File.ReadAllText(file);
trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
}
var references2 = new[]{
MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
};
var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
var dlldir = Path.Combine(dirpath, $".{assemblyName}");
if (Directory.Exists(dlldir) == false)
Directory.CreateDirectory(dlldir);
var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
return result2.Success;
}
}
//作用:调用protoc工具生成C#代码,本例生成xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
{
private readonly ILogger<ProtoGenerateCode> Logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
public ProtoGenerateCode(ILogger<ProtoGenerateCode> logger)
{
this.Logger = logger;
}
public void GenerateCsharpFromProto(params string[] protoPath)
{
var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();// 系统架构,x86 x64
var bin = string.Empty;
var os = string.Empty;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
os = "windows";
bin = ".exe";
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
os = "linux";
else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
os = "macosx";
else
{
Logger.LogError("该平台不支持grpctools.");
return;
}
var args = new Dictionary<string, List<string>>();
var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
string outdir = Path.Combine(BaseDirectory, "plugins");
if (Directory.Exists(outdir) == false)
Directory.CreateDirectory(outdir);
args.Add("proto_path", new List<string>(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
args.Add("csharp_out", new List<string>(new string[] { outdir }));
if (!string.IsNullOrEmpty(grpcPath))
{
args.Add("plugin", new List<string>(new string[] { "protoc-gen-grpc=" + grpcPath }));
args.Add("grpc_out", new List<string>(new string[] { outdir }));
}
var argsValue = WriteArgs(string.Join<string>(" ", protoPath), args);//批量.\st\supplier.proto .\st\customer.proto
Logger.LogInformation("Running: " + protocPath + " " + argsValue);
var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
if (!string.IsNullOrEmpty(stderr))
throw new InvalidOperationException(stderr);
}
private string WriteArgs(string protoFile, Dictionary<string, List<string>> args)
{
var sb = new StringBuilder();
foreach (var kvp in args)
{
foreach (var argInstance in kvp.Value)
sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
}
sb.AppendFormat(" {0}", protoFile);
return sb.ToString();
}
static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
{
using (var proc = new Process())
{
var psi = proc.StartInfo;
psi.FileName = path;
psi.Arguments = arguments;
if (!string.IsNullOrEmpty(workingDir))
psi.WorkingDirectory = workingDir;
psi.RedirectStandardError = psi.RedirectStandardOutput = true;
psi.UseShellExecute = false;
psi.CreateNoWindow = true;
proc.Start();
var stdoutTask = proc.StandardOutput.ReadToEndAsync();
var stderrTask = proc.StandardError.ReadToEndAsync();
if (!proc.WaitForExit(5000))
{
try { proc.Kill(); } catch { }
}
var exitCode = proc.ExitCode;
stderr = stdout = "";
if (stdoutTask.Wait(1000))
stdout = stdoutTask.Result;
if (stderrTask.Wait(1000))
stderr = stderrTask.Result;
return exitCode;
}
}
}
//重新该接口以实现RPC的转发,
//indexof('grpc')的实现,是因为有可能采用https的请求
//端口号加1000的作用是下游服务也支持http,所以grpc请求的端口就是http的端口在加上1000
[Serializable]
public class GrpcHttpRequester : IHttpRequester
{
private readonly IHttpClientCache _cacheHandlers;
private readonly IOcelotLogger _logger;
private readonly IDelegatingHandlerHandlerFactory _factory;
private readonly IExceptionToErrorMapper _mapper;
public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
IHttpClientCache cacheHandlers,
IDelegatingHandlerHandlerFactory factory,
IExceptionToErrorMapper mapper)
{
this._logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
this._cacheHandlers = cacheHandlers;
this._factory = factory;
this._mapper = mapper;
}
public async Task<Response<HttpResponseMessage>> GetResponse(HttpContext httpContext)
{
var downstreamRequest = httpContext.Items.DownstreamRequest();
if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
return await ProcessHttpResponse(httpContext);
return await ProcessGrpcResponse(httpContext);
}
private async Task<Response<HttpResponseMessage>> ProcessGrpcResponse(HttpContext httpContext)
{
try
{
GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
return await this.ProcessHttpResponse(httpContext);
//throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");
var downStreamRqeust = httpContext.Items.DownstreamRequest();
var options = new List<ChannelOption> { new ChannelOption("keepalive_time_ms", 60000) };
if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
//这个是长链接的,不加连接池是否会有问题??(参照:https://github.com/leenux/GrpcPool)-->数据量比较大的情况下怎么处理
Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
var client = new MethodDescriptorClient(channel);
var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
return new OkResponse<HttpResponseMessage>(httpResponseMessage);
}
catch (RpcException exception)
{
var error = _mapper.Map(exception);
return new OKButFailResponse<HttpResponseMessage>(error);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
}
private async Task<Response<HttpResponseMessage>> ProcessHttpResponse(HttpContext httpContext)
{
var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
var downstreamRoute = httpContext.Items.DownstreamRoute();
var downstreamRequest = httpContext.Items.DownstreamRequest();
var httpClient = builder.Create(downstreamRoute);
try
{
var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
return new OkResponse<HttpResponseMessage>(response);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
finally
{
builder.Save();
}
}
}
//作用:实现GRPC的请求,具体看InvokeAsync方法
public class MethodDescriptorClient : ClientBase<MethodDescriptorClient>
{
public MethodDescriptorClient(Channel channel)
: base(channel)
{
}
public MethodDescriptorClient(CallInvoker callInvoker)
: base(callInvoker)
{
}
public MethodDescriptorClient()
: base()
{
}
protected MethodDescriptorClient(ClientBaseConfiguration configuration)
: base(configuration)
{
}
protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration)
{
return new MethodDescriptorClient(configuration);
}
/// <summary>
/// InvokeAsync
/// </summary>
public Task<HttpResponseMessage> InvokeAsync(GrpcRequestMessage grpcRequestMessage)
{
var methodDescriptor = grpcRequestMessage.GrpcRequestMethod;
System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
return (Task<HttpResponseMessage>)m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage });
}
private async Task<HttpResponseMessage> CallGrpcAsyncCore<TRequest, TResponse>(GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage<TRequest> where TResponse : class, IMessage<TResponse>
{
CallOptions option = CreateCallOptions(grpcRequestMessage.Headers);
var rpc = GrpcMethodBuilder<TRequest, TResponse>.GetMethod(grpcRequestMessage.GrpcRequestMethod);
var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync();
TRequest request = JsonConvert.DeserializeObject<TRequest>(requestMessage);
List<TRequest> requests = new List<TRequest>() { request };
switch (rpc.Type)
{
case MethodType.Unary:
var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1);
case MethodType.ClientStreaming:
var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1);
case MethodType.ServerStreaming:
var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1);
case MethodType.DuplexStreaming:
var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1);
default:
throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported.");
}
}
private Task<HttpResponseMessage> ProcessHttpResponseMessage<TResponse>(Metadata headers, params TResponse[] responses)
{
HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK);
httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses));
foreach (var entry in headers)
httpResponseMessage.Headers.Add(entry.Key, entry.Value);
return Task.FromResult(httpResponseMessage);
}
private CallOptions CreateCallOptions(HttpRequestHeaders headers)
{
Metadata meta = new Metadata();
foreach (var entry in headers)
meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault());
CallOptions option = new CallOptions(meta);
return option;
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncUnaryCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncUnaryCall<TResponse> call = invoker.AsyncUnaryCall(method, null, option, request))
{
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncClientStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncClientStreamingCall<TRequest, TResponse> call = invoker.AsyncClientStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncServerStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncServerStreamingCall<TResponse> call = invoker.AsyncServerStreamingCall(method, null, option, request))
{
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncDuplexStreamingCall<TRequest, TResponse> call = invoker.AsyncDuplexStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
}
MethodDescriptorClient 类中获取Reuqest的请求参数中,目前只支持StringContent (httpClient POST的方式请求到后端,取得到的都是ByteArrayContent)
所以客户端的调用方式如下:
using (HttpClient httpClient = new HttpClient())
{
var queryurl = queryUrl();
if (queryurl.EndsWith("/") == false)
queryurl += "/";
//var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
//content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");
var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");
var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
var responseMessage = postResultTask.GetAwaiter().GetResult();
this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
网关注册如下
public void ConfigureServices(IServiceCollection services)
{
services.AddOcelot().AddOcelotGrpc();
}
网关服务需要放置protoc程序。第二个目录用来放置proto文件
Git地址:Sam/Ocelot.Provider.RPC
版权声明
本文为[流苏1990]所创,转载请带上原文链接,感谢
https://blog.csdn.net/fuweiping/article/details/124325025
边栏推荐
- Modify the Jupiter notebook style
- Record a strange bug: component copy after cache component jump
- Reading notes: meta matrix factorization for federated rating predictions
- Express ② (routing)
- Force deduction brush question 101 Symmetric binary tree
- The art of automation
- 33 million IOPs, 39 microsecond delay, carbon footprint certification, who is serious?
- UML Unified Modeling Language
- Pytorch 经典卷积神经网络 LeNet
- Jenkins construction and use
猜你喜欢
ACFs file system creation, expansion, reduction and other configuration steps
蓝绿发布、滚动发布、灰度发布,有什么区别?
Leetcode brush question 897 incremental sequential search tree
Choreographer全解析
解决方案架构师的小锦囊 - 架构图的 5 种类型
cnpm的诡异bug
How does redis solve the problems of cache avalanche, cache breakdown and cache penetration
freeCodeCamp----time_ Calculator exercise
Kettle--控件解析
2021年秋招,薪资排行NO
随机推荐
Problems encountered in the project (V) understanding of operating excel interface poi
Introduction to spark basic operation
ACFs file system creation, expansion, reduction and other configuration steps
freeCodeCamp----arithmetic_ Arranger exercise
Android篇:2019初中级Android开发社招面试解答(中
村上春树 --《当我谈跑步时,我谈些什么》句子摘录
Analysis of cluster component gpnp failed to start successfully in RAC environment
Express②(路由)
pycharm Install packages failed
Function executes only the once function for the first time
Use future and countdownlatch to realize multithreading to execute multiple asynchronous tasks, and return results after all tasks are completed
10g database cannot be started when using large memory host
Quartus Prime硬件实验开发(DE2-115板)实验二功能可调综合计时器设计
JS 力扣刷题 102. 二叉树的层序遍历
MySQL [SQL performance analysis + SQL tuning]
MySQL [acid + isolation level + redo log + undo log]
JS 烧脑面试题大赏
Un modèle universel pour la construction d'un modèle d'apprentissage scikit
[code analysis (2)] communication efficient learning of deep networks from decentralized data
UML统一建模语言