当前位置:网站首页>Grpc gateway based on Ocelot
Grpc gateway based on Ocelot
2022-04-23 16:54:00 【Tassel 1990】
original text & For ideas, see ( There are many code adjustments in this example , Also made a lot of improvements ): be based on Ocelot Of gRpcHttp gateway _dotNET Cross platform blog -CSDN Blog
After gateway erection , The request is as follows :

Thinking analysis :
1、 Regularly monitor a storage .proto Folder .( See :DirectoryMonitorBackgroundService)
2、 When the file changes call protoc Tool generation C# Code .( This example adds multiple files compiled at one time , However, other references are not resolved proto The problem of & Problems with the same class name )
3、 Call after generating code base note CSharpCompilation Instance to generate the corresponding DLL.
4、 Decompile DLL Post acquisition MethodDescriptor object , And cache it .
5、 Refill Ocelot Medium IHttpRequester Interface . The function of this interface is According to the obtained DownstreamRequest use HTTP Send the downstream host request to get the data Before forwarding to the requester .
6、IHttpRequester When processing the request, judge whether it contains grpc Request value for , If yes, resolve the service name and method name , And match MethodDescriptor Object caching .
7、 To achieve ClientBase<T>, And creating Channel Create after Client example . And then call gRP service
8、 After obtaining the data , Forward to requester
Some specific codes are listed below .
// effect : According to the obtained change documents , Generate c# Code , Then generate DLL, Then decompile to get MethodDescriptor
[Serializable]
public class GrpcCodeGeneraterSubscriber : IEventSubscriber
{
private readonly ILogger<GrpcCodeGeneraterSubscriber> logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
private readonly IGrpcServiceDescriptor serviceDescriptor = null;
private readonly IProtoGenerateCode protoGenerateCode = null;
public GrpcCodeGeneraterSubscriber(ILogger<GrpcCodeGeneraterSubscriber> logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
{
this.logger = logger;
this.serviceDescriptor = serviceDescriptor;
this.protoGenerateCode = protoGenerateCode;
}
[EventSubscribe("GrpcCodeGenerater")]
public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
{
var protefileList = context.Source.Payload as string[];
if (protefileList == null || protefileList.Length <= 0)
return;
this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
foreach (var protofilepath in protefileList)
{
var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
if (GenerateDllAsync(protofilenamewithoutExtension) == false)
return;
var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
}
this.logger.LogInformation($" generater dll compeleted:{string.Join<string>(",", protefileList.Select(y => Path.GetFileName(y)))} ");
// Delete file
foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
File.Delete(var);
}
private bool GenerateDllAsync(string assemblyName)
{
var dirpath = Path.Combine(BaseDirectory, "plugins");
var dllFiles = Directory.GetFiles(dirpath, "*.cs");
if (dllFiles.Length == 0)
return false;
List<SyntaxTree> trees = new List<SyntaxTree>();
foreach (var file in dllFiles)
{
var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
continue;
var csStr = File.ReadAllText(file);
trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
}
var references2 = new[]{
MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
};
var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
var dlldir = Path.Combine(dirpath, $".{assemblyName}");
if (Directory.Exists(dlldir) == false)
Directory.CreateDirectory(dlldir);
var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
return result2.Success;
}
}
// effect : call protoc Tool generation C# Code , This example generates xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
{
private readonly ILogger<ProtoGenerateCode> Logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
public ProtoGenerateCode(ILogger<ProtoGenerateCode> logger)
{
this.Logger = logger;
}
public void GenerateCsharpFromProto(params string[] protoPath)
{
var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();// System architecture ,x86 x64
var bin = string.Empty;
var os = string.Empty;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
os = "windows";
bin = ".exe";
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
os = "linux";
else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
os = "macosx";
else
{
Logger.LogError(" This platform does not support grpctools.");
return;
}
var args = new Dictionary<string, List<string>>();
var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
string outdir = Path.Combine(BaseDirectory, "plugins");
if (Directory.Exists(outdir) == false)
Directory.CreateDirectory(outdir);
args.Add("proto_path", new List<string>(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
args.Add("csharp_out", new List<string>(new string[] { outdir }));
if (!string.IsNullOrEmpty(grpcPath))
{
args.Add("plugin", new List<string>(new string[] { "protoc-gen-grpc=" + grpcPath }));
args.Add("grpc_out", new List<string>(new string[] { outdir }));
}
var argsValue = WriteArgs(string.Join<string>(" ", protoPath), args);// Batch .\st\supplier.proto .\st\customer.proto
Logger.LogInformation("Running: " + protocPath + " " + argsValue);
var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
if (!string.IsNullOrEmpty(stderr))
throw new InvalidOperationException(stderr);
}
private string WriteArgs(string protoFile, Dictionary<string, List<string>> args)
{
var sb = new StringBuilder();
foreach (var kvp in args)
{
foreach (var argInstance in kvp.Value)
sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
}
sb.AppendFormat(" {0}", protoFile);
return sb.ToString();
}
static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
{
using (var proc = new Process())
{
var psi = proc.StartInfo;
psi.FileName = path;
psi.Arguments = arguments;
if (!string.IsNullOrEmpty(workingDir))
psi.WorkingDirectory = workingDir;
psi.RedirectStandardError = psi.RedirectStandardOutput = true;
psi.UseShellExecute = false;
psi.CreateNoWindow = true;
proc.Start();
var stdoutTask = proc.StandardOutput.ReadToEndAsync();
var stderrTask = proc.StandardError.ReadToEndAsync();
if (!proc.WaitForExit(5000))
{
try { proc.Kill(); } catch { }
}
var exitCode = proc.ExitCode;
stderr = stdout = "";
if (stdoutTask.Wait(1000))
stdout = stdoutTask.Result;
if (stderrTask.Wait(1000))
stderr = stderrTask.Result;
return exitCode;
}
}
}
// Re the interface to implement RPC Forwarding ,
//indexof('grpc') The implementation of the , Because it is possible to use https Request
// Port number plus 1000 The role of is that downstream services also support http, therefore grpc The requested port is http The port of is adding 1000
[Serializable]
public class GrpcHttpRequester : IHttpRequester
{
private readonly IHttpClientCache _cacheHandlers;
private readonly IOcelotLogger _logger;
private readonly IDelegatingHandlerHandlerFactory _factory;
private readonly IExceptionToErrorMapper _mapper;
public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
IHttpClientCache cacheHandlers,
IDelegatingHandlerHandlerFactory factory,
IExceptionToErrorMapper mapper)
{
this._logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
this._cacheHandlers = cacheHandlers;
this._factory = factory;
this._mapper = mapper;
}
public async Task<Response<HttpResponseMessage>> GetResponse(HttpContext httpContext)
{
var downstreamRequest = httpContext.Items.DownstreamRequest();
if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
return await ProcessHttpResponse(httpContext);
return await ProcessGrpcResponse(httpContext);
}
private async Task<Response<HttpResponseMessage>> ProcessGrpcResponse(HttpContext httpContext)
{
try
{
GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
return await this.ProcessHttpResponse(httpContext);
//throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");
var downStreamRqeust = httpContext.Items.DownstreamRequest();
var options = new List<ChannelOption> { new ChannelOption("keepalive_time_ms", 60000) };
if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
// This is a long link , Whether there will be a problem without connection pool ??( reference :https://github.com/leenux/GrpcPool)--> How to deal with a large amount of data
Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
var client = new MethodDescriptorClient(channel);
var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
return new OkResponse<HttpResponseMessage>(httpResponseMessage);
}
catch (RpcException exception)
{
var error = _mapper.Map(exception);
return new OKButFailResponse<HttpResponseMessage>(error);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
}
private async Task<Response<HttpResponseMessage>> ProcessHttpResponse(HttpContext httpContext)
{
var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
var downstreamRoute = httpContext.Items.DownstreamRoute();
var downstreamRequest = httpContext.Items.DownstreamRequest();
var httpClient = builder.Create(downstreamRoute);
try
{
var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
return new OkResponse<HttpResponseMessage>(response);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
finally
{
builder.Save();
}
}
}
// effect : Realization GRPC Request , To be specific, see InvokeAsync Method
public class MethodDescriptorClient : ClientBase<MethodDescriptorClient>
{
public MethodDescriptorClient(Channel channel)
: base(channel)
{
}
public MethodDescriptorClient(CallInvoker callInvoker)
: base(callInvoker)
{
}
public MethodDescriptorClient()
: base()
{
}
protected MethodDescriptorClient(ClientBaseConfiguration configuration)
: base(configuration)
{
}
protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration)
{
return new MethodDescriptorClient(configuration);
}
/// <summary>
/// InvokeAsync
/// </summary>
public Task<HttpResponseMessage> InvokeAsync(GrpcRequestMessage grpcRequestMessage)
{
var methodDescriptor = grpcRequestMessage.GrpcRequestMethod;
System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
return (Task<HttpResponseMessage>)m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage });
}
private async Task<HttpResponseMessage> CallGrpcAsyncCore<TRequest, TResponse>(GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage<TRequest> where TResponse : class, IMessage<TResponse>
{
CallOptions option = CreateCallOptions(grpcRequestMessage.Headers);
var rpc = GrpcMethodBuilder<TRequest, TResponse>.GetMethod(grpcRequestMessage.GrpcRequestMethod);
var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync();
TRequest request = JsonConvert.DeserializeObject<TRequest>(requestMessage);
List<TRequest> requests = new List<TRequest>() { request };
switch (rpc.Type)
{
case MethodType.Unary:
var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1);
case MethodType.ClientStreaming:
var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1);
case MethodType.ServerStreaming:
var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1);
case MethodType.DuplexStreaming:
var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1);
default:
throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported.");
}
}
private Task<HttpResponseMessage> ProcessHttpResponseMessage<TResponse>(Metadata headers, params TResponse[] responses)
{
HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK);
httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses));
foreach (var entry in headers)
httpResponseMessage.Headers.Add(entry.Key, entry.Value);
return Task.FromResult(httpResponseMessage);
}
private CallOptions CreateCallOptions(HttpRequestHeaders headers)
{
Metadata meta = new Metadata();
foreach (var entry in headers)
meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault());
CallOptions option = new CallOptions(meta);
return option;
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncUnaryCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncUnaryCall<TResponse> call = invoker.AsyncUnaryCall(method, null, option, request))
{
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncClientStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncClientStreamingCall<TRequest, TResponse> call = invoker.AsyncClientStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncServerStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncServerStreamingCall<TResponse> call = invoker.AsyncServerStreamingCall(method, null, option, request))
{
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncDuplexStreamingCall<TRequest, TResponse> call = invoker.AsyncDuplexStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
}
MethodDescriptorClient Get in class Reuqest In the request parameters of , Currently only supported StringContent (httpClient POST Request to the back end , What we get is ByteArrayContent)
Therefore, the calling method of the client is as follows :
using (HttpClient httpClient = new HttpClient())
{
var queryurl = queryUrl();
if (queryurl.EndsWith("/") == false)
queryurl += "/";
//var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
//content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");
var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");
var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
var responseMessage = postResultTask.GetAwaiter().GetResult();
this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
The gateway is registered as follows
public void ConfigureServices(IServiceCollection services)
{
services.AddOcelot().AddOcelotGrpc();
}
The gateway service needs to be placed protoc Program . The second directory is for proto file

Git Address :Sam/Ocelot.Provider.RPC
版权声明
本文为[Tassel 1990]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231359253190.html
边栏推荐
- ◰ GL shadow map core steps
- Execution plan calculation for different time types
- Shell脚本——Shell编程规范及变量
- org. apache. parquet. schema. InvalidSchemaException: A group type can not be empty. Parquet does not su
- Pseudo Distributed installation spark
- DDT + Excel for interface test
- Interface document yaml
- JMeter installation tutorial and solutions to the problems I encountered
- MySql主从复制
- 文件操作详解(2)
猜你喜欢

文件操作详解(2)

Deepinv20 installation MariaDB

Idea of batch manufacturing test data, with source code

Dancenn: overview of byte self-developed 100 billion scale file metadata storage system

Detailed explanation of information abstract, digital signature, digital certificate, symmetric encryption and asymmetric encryption

计组 | 【七 输入/输出系统】知识点与例题

Smart doc + Torna generate interface document

ByteVCharts可视化图表库,你想要的我都有

Mock test using postman

Pycham connects to the remote server and realizes remote debugging
随机推荐
MySQL master-slave synchronization pit avoidance version tutorial
Lock锁
Disk management and file system
Use case execution of robot framework
Interface document yaml
Project framework of robot framework
Selenium IDE and XPath installation of chrome plug-in
RAID磁盘阵列与RAID5的创建
feign报400处理
【PIMF】OpenHarmony啃论文俱乐部—在ACM Survey闲逛是什么体验
聊一聊浏览器缓存控制
◰GL-着色器处理程序封装
Custom implementation of Baidu image recognition (instead of aipocr)
◰ GL shadow map core steps
Smart doc + Torna generate interface document
ByteVCharts可视化图表库,你想要的我都有
Detailed explanation of Niuke - Gloves
Loggie source code analysis source file module backbone analysis
Dlib of face recognition framework
Real time operation of vim editor