当前位置:网站首页>Grpc gateway based on Ocelot
Grpc gateway based on Ocelot
2022-04-23 16:54:00 【Tassel 1990】
original text & For ideas, see ( There are many code adjustments in this example , Also made a lot of improvements ): be based on Ocelot Of gRpcHttp gateway _dotNET Cross platform blog -CSDN Blog
After gateway erection , The request is as follows :
Thinking analysis :
1、 Regularly monitor a storage .proto Folder .( See :DirectoryMonitorBackgroundService)
2、 When the file changes call protoc Tool generation C# Code .( This example adds multiple files compiled at one time , However, other references are not resolved proto The problem of & Problems with the same class name )
3、 Call after generating code base note CSharpCompilation Instance to generate the corresponding DLL.
4、 Decompile DLL Post acquisition MethodDescriptor object , And cache it .
5、 Refill Ocelot Medium IHttpRequester Interface . The function of this interface is According to the obtained DownstreamRequest use HTTP Send the downstream host request to get the data Before forwarding to the requester .
6、IHttpRequester When processing the request, judge whether it contains grpc Request value for , If yes, resolve the service name and method name , And match MethodDescriptor Object caching .
7、 To achieve ClientBase<T>, And creating Channel Create after Client example . And then call gRP service
8、 After obtaining the data , Forward to requester
Some specific codes are listed below .
// effect : According to the obtained change documents , Generate c# Code , Then generate DLL, Then decompile to get MethodDescriptor
[Serializable]
public class GrpcCodeGeneraterSubscriber : IEventSubscriber
{
private readonly ILogger<GrpcCodeGeneraterSubscriber> logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
private readonly IGrpcServiceDescriptor serviceDescriptor = null;
private readonly IProtoGenerateCode protoGenerateCode = null;
public GrpcCodeGeneraterSubscriber(ILogger<GrpcCodeGeneraterSubscriber> logger, IGrpcServiceDescriptor serviceDescriptor, IProtoGenerateCode protoGenerateCode)
{
this.logger = logger;
this.serviceDescriptor = serviceDescriptor;
this.protoGenerateCode = protoGenerateCode;
}
[EventSubscribe("GrpcCodeGenerater")]
public async Task GrpcCodeGenerater(EventHandlerExecutingContext context)
{
var protefileList = context.Source.Payload as string[];
if (protefileList == null || protefileList.Length <= 0)
return;
this.protoGenerateCode.GenerateCsharpFromProto(protefileList);
foreach (var protofilepath in protefileList)
{
var protofilenamewithoutExtension = Path.GetFileNameWithoutExtension(protofilepath);
if (GenerateDllAsync(protofilenamewithoutExtension) == false)
return;
var csharp_out = Path.Combine(BaseDirectory, $"plugins/.{protofilenamewithoutExtension}");
File.WriteAllText(Path.Combine(csharp_out, $"plugin.txt"), File.GetLastWriteTime(protofilepath).ToString("yyyy-MM-dd HH:mm:ss"));
await this.serviceDescriptor.CreateGrpcDescriptorAsync(Path.Combine(csharp_out, $"{protofilenamewithoutExtension}.dll"));
}
this.logger.LogInformation($" generater dll compeleted:{string.Join<string>(",", protefileList.Select(y => Path.GetFileName(y)))} ");
// Delete file
foreach (string var in Directory.GetFiles(Path.Combine(BaseDirectory, "plugins"), "*.cs"))
File.Delete(var);
}
private bool GenerateDllAsync(string assemblyName)
{
var dirpath = Path.Combine(BaseDirectory, "plugins");
var dllFiles = Directory.GetFiles(dirpath, "*.cs");
if (dllFiles.Length == 0)
return false;
List<SyntaxTree> trees = new List<SyntaxTree>();
foreach (var file in dllFiles)
{
var fileName = Path.GetFileNameWithoutExtension(file).ToLower();
if (fileName != assemblyName.ToLower() && fileName != string.Concat(assemblyName, "Grpc").ToLower())
continue;
var csStr = File.ReadAllText(file);
trees.Add(CSharpSyntaxTree.ParseText(csStr, encoding: Encoding.UTF8));
}
var references2 = new[]{
MetadataReference.CreateFromFile(Assembly.Load("netstandard, Version=2.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Runtime, Version=0.0.0.0").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.IO, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Memory, Version=5.0.0.0, Culture=neutral, PublicKeyToken=cc7b13ffcd2ddd51").Location),
MetadataReference.CreateFromFile(Assembly.Load("System.Threading.Tasks, Version=4.0.10.0, Culture=neutral, PublicKeyToken=b03f5f7f11d50a3a").Location),
MetadataReference.CreateFromFile(typeof(object).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Google.Protobuf.ProtoPreconditions).Assembly.Location),
MetadataReference.CreateFromFile(typeof(SerializationContext).Assembly.Location),
MetadataReference.CreateFromFile(typeof(Channel).Assembly.Location)
};
var options = new CSharpCompilationOptions(outputKind: OutputKind.DynamicallyLinkedLibrary, optimizationLevel: OptimizationLevel.Debug, generalDiagnosticOption: ReportDiagnostic.Error);
var dlldir = Path.Combine(dirpath, $".{assemblyName}");
if (Directory.Exists(dlldir) == false)
Directory.CreateDirectory(dlldir);
var result2 = CSharpCompilation.Create(assemblyName, trees, references2, options).Emit(Path.Combine(dlldir, $"{assemblyName}.dll"));
this.logger.Log(result2.Success ? LogLevel.Debug : LogLevel.Error, string.Join(",", result2.Diagnostics.Select(d => string.Format("[{0}]:{1}({2})", d.Id, d.GetMessage(), d.Location.GetLineSpan().StartLinePosition))));
return result2.Success;
}
}
// effect : call protoc Tool generation C# Code , This example generates xxx.cs & xxxGrpc.cs
public class ProtoGenerateCode : IProtoGenerateCode
{
private readonly ILogger<ProtoGenerateCode> Logger = null;
private static readonly string BaseDirectory = AppDomain.CurrentDomain.BaseDirectory;
public ProtoGenerateCode(ILogger<ProtoGenerateCode> logger)
{
this.Logger = logger;
}
public void GenerateCsharpFromProto(params string[] protoPath)
{
var architecture = RuntimeInformation.OSArchitecture.ToString().ToLower();// System architecture ,x86 x64
var bin = string.Empty;
var os = string.Empty;
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
os = "windows";
bin = ".exe";
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
os = "linux";
else if (RuntimeInformation.IsOSPlatform(OSPlatform.OSX))
os = "macosx";
else
{
Logger.LogError(" This platform does not support grpctools.");
return;
}
var args = new Dictionary<string, List<string>>();
var protocPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/protoc{bin}");
var grpcPath = Path.Combine(BaseDirectory, $"tools/{os}_{architecture}/grpc_csharp_plugin{bin}");
string outdir = Path.Combine(BaseDirectory, "plugins");
if (Directory.Exists(outdir) == false)
Directory.CreateDirectory(outdir);
args.Add("proto_path", new List<string>(protoPath.Select(t => Path.GetDirectoryName(t)).Distinct(StringComparer.OrdinalIgnoreCase)));
args.Add("csharp_out", new List<string>(new string[] { outdir }));
if (!string.IsNullOrEmpty(grpcPath))
{
args.Add("plugin", new List<string>(new string[] { "protoc-gen-grpc=" + grpcPath }));
args.Add("grpc_out", new List<string>(new string[] { outdir }));
}
var argsValue = WriteArgs(string.Join<string>(" ", protoPath), args);// Batch .\st\supplier.proto .\st\customer.proto
Logger.LogInformation("Running: " + protocPath + " " + argsValue);
var exitCode = RunProtoc(protocPath, argsValue, string.Empty, out string stdout, out string stderr);
if (!string.IsNullOrEmpty(stderr))
throw new InvalidOperationException(stderr);
}
private string WriteArgs(string protoFile, Dictionary<string, List<string>> args)
{
var sb = new StringBuilder();
foreach (var kvp in args)
{
foreach (var argInstance in kvp.Value)
sb.AppendFormat("--{0}={1} ", kvp.Key, argInstance);
}
sb.AppendFormat(" {0}", protoFile);
return sb.ToString();
}
static int RunProtoc(string path, string arguments, string workingDir, out string stdout, out string stderr)
{
using (var proc = new Process())
{
var psi = proc.StartInfo;
psi.FileName = path;
psi.Arguments = arguments;
if (!string.IsNullOrEmpty(workingDir))
psi.WorkingDirectory = workingDir;
psi.RedirectStandardError = psi.RedirectStandardOutput = true;
psi.UseShellExecute = false;
psi.CreateNoWindow = true;
proc.Start();
var stdoutTask = proc.StandardOutput.ReadToEndAsync();
var stderrTask = proc.StandardError.ReadToEndAsync();
if (!proc.WaitForExit(5000))
{
try { proc.Kill(); } catch { }
}
var exitCode = proc.ExitCode;
stderr = stdout = "";
if (stdoutTask.Wait(1000))
stdout = stdoutTask.Result;
if (stderrTask.Wait(1000))
stderr = stderrTask.Result;
return exitCode;
}
}
}
// Re the interface to implement RPC Forwarding ,
//indexof('grpc') The implementation of the , Because it is possible to use https Request
// Port number plus 1000 The role of is that downstream services also support http, therefore grpc The requested port is http The port of is adding 1000
[Serializable]
public class GrpcHttpRequester : IHttpRequester
{
private readonly IHttpClientCache _cacheHandlers;
private readonly IOcelotLogger _logger;
private readonly IDelegatingHandlerHandlerFactory _factory;
private readonly IExceptionToErrorMapper _mapper;
public GrpcHttpRequester(IOcelotLoggerFactory loggerFactory,
IHttpClientCache cacheHandlers,
IDelegatingHandlerHandlerFactory factory,
IExceptionToErrorMapper mapper)
{
this._logger = loggerFactory.CreateLogger<HttpClientHttpRequester>();
this._cacheHandlers = cacheHandlers;
this._factory = factory;
this._mapper = mapper;
}
public async Task<Response<HttpResponseMessage>> GetResponse(HttpContext httpContext)
{
var downstreamRequest = httpContext.Items.DownstreamRequest();
if (downstreamRequest.Scheme.IndexOf("grpc") < 0)
return await ProcessHttpResponse(httpContext);
return await ProcessGrpcResponse(httpContext);
}
private async Task<Response<HttpResponseMessage>> ProcessGrpcResponse(HttpContext httpContext)
{
try
{
GrpcRequestMessage grpcRequestMessage = await GrpcRequestMessage.FromReuqest(httpContext);
if (grpcRequestMessage == null || grpcRequestMessage.GrpcRequestMethod == null)
return await this.ProcessHttpResponse(httpContext);
//throw new NullReferenceException($"Request url:{httpContext.Request.Path.ToString()}Can't found Grpc.ServiceName & MethodName.");
var downStreamRqeust = httpContext.Items.DownstreamRequest();
var options = new List<ChannelOption> { new ChannelOption("keepalive_time_ms", 60000) };
if (string.IsNullOrEmpty(grpcRequestMessage.RequestVersion) == false)
options.Add(new ChannelOption("requestVersion", grpcRequestMessage.RequestVersion));
// This is a long link , Whether there will be a problem without connection pool ??( reference :https://github.com/leenux/GrpcPool)--> How to deal with a large amount of data
Channel channel = new Channel(downStreamRqeust.Host, Convert.ToInt32(downStreamRqeust.Port) + 1000, ChannelCredentials.Insecure, options);
var client = new MethodDescriptorClient(channel);
var httpResponseMessage = await client.InvokeAsync(grpcRequestMessage);
return new OkResponse<HttpResponseMessage>(httpResponseMessage);
}
catch (RpcException exception)
{
var error = _mapper.Map(exception);
return new OKButFailResponse<HttpResponseMessage>(error);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
}
private async Task<Response<HttpResponseMessage>> ProcessHttpResponse(HttpContext httpContext)
{
var builder = new HttpClientBuilder(_factory, _cacheHandlers, _logger);
var downstreamRoute = httpContext.Items.DownstreamRoute();
var downstreamRequest = httpContext.Items.DownstreamRequest();
var httpClient = builder.Create(downstreamRoute);
try
{
var response = await httpClient.SendAsync(downstreamRequest.ToHttpRequestMessage(), httpContext.RequestAborted);
return new OkResponse<HttpResponseMessage>(response);
}
catch (Exception exception)
{
var error = _mapper.Map(exception);
return new ErrorResponse<HttpResponseMessage>(error);
}
finally
{
builder.Save();
}
}
}
// effect : Realization GRPC Request , To be specific, see InvokeAsync Method
public class MethodDescriptorClient : ClientBase<MethodDescriptorClient>
{
public MethodDescriptorClient(Channel channel)
: base(channel)
{
}
public MethodDescriptorClient(CallInvoker callInvoker)
: base(callInvoker)
{
}
public MethodDescriptorClient()
: base()
{
}
protected MethodDescriptorClient(ClientBaseConfiguration configuration)
: base(configuration)
{
}
protected override MethodDescriptorClient NewInstance(ClientBaseConfiguration configuration)
{
return new MethodDescriptorClient(configuration);
}
/// <summary>
/// InvokeAsync
/// </summary>
public Task<HttpResponseMessage> InvokeAsync(GrpcRequestMessage grpcRequestMessage)
{
var methodDescriptor = grpcRequestMessage.GrpcRequestMethod;
System.Reflection.MethodInfo m = typeof(MethodDescriptorClient).GetMethod("CallGrpcAsyncCore", System.Reflection.BindingFlags.Instance | System.Reflection.BindingFlags.NonPublic);
return (Task<HttpResponseMessage>)m.MakeGenericMethod(new Type[] { methodDescriptor.InputType.ClrType, methodDescriptor.OutputType.ClrType }).Invoke(this, new object[] { grpcRequestMessage });
}
private async Task<HttpResponseMessage> CallGrpcAsyncCore<TRequest, TResponse>(GrpcRequestMessage grpcRequestMessage) where TRequest : class, IMessage<TRequest> where TResponse : class, IMessage<TResponse>
{
CallOptions option = CreateCallOptions(grpcRequestMessage.Headers);
var rpc = GrpcMethodBuilder<TRequest, TResponse>.GetMethod(grpcRequestMessage.GrpcRequestMethod);
var requestMessage = await grpcRequestMessage.RequestMessage.Content.ReadAsStringAsync();
TRequest request = JsonConvert.DeserializeObject<TRequest>(requestMessage);
List<TRequest> requests = new List<TRequest>() { request };
switch (rpc.Type)
{
case MethodType.Unary:
var taskUnary = await AsyncUnaryCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskUnary.Item2, taskUnary.Item1);
case MethodType.ClientStreaming:
var taskClientStreaming = await AsyncClientStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskClientStreaming.Item2, taskClientStreaming.Item1);
case MethodType.ServerStreaming:
var taskServerStreaming = await AsyncServerStreamingCall(CallInvoker, rpc, option, requests.FirstOrDefault());
return await ProcessHttpResponseMessage(taskServerStreaming.Item2, taskServerStreaming.Item1);
case MethodType.DuplexStreaming:
var taskDuplexStreaming = await AsyncDuplexStreamingCall(CallInvoker, rpc, option, requests);
return await ProcessHttpResponseMessage(taskDuplexStreaming.Item2, taskDuplexStreaming.Item1);
default:
throw new NotSupportedException($"MethodType '{rpc.Type}' is not supported.");
}
}
private Task<HttpResponseMessage> ProcessHttpResponseMessage<TResponse>(Metadata headers, params TResponse[] responses)
{
HttpResponseMessage httpResponseMessage = new HttpResponseMessage(HttpStatusCode.OK);
httpResponseMessage.Content = new StringContent(JsonConvert.SerializeObject(responses));
foreach (var entry in headers)
httpResponseMessage.Headers.Add(entry.Key, entry.Value);
return Task.FromResult(httpResponseMessage);
}
private CallOptions CreateCallOptions(HttpRequestHeaders headers)
{
Metadata meta = new Metadata();
foreach (var entry in headers)
meta.Add(entry.Key.Replace("grpc.", ""), entry.Value.FirstOrDefault());
CallOptions option = new CallOptions(meta);
return option;
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncUnaryCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncUnaryCall<TResponse> call = invoker.AsyncUnaryCall(method, null, option, request))
{
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<TResponse, Metadata>> AsyncClientStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncClientStreamingCall<TRequest, TResponse> call = invoker.AsyncClientStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
return Tuple.Create(await call.ResponseAsync, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="request"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncServerStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, TRequest request) where TRequest : class where TResponse : class
{
using (AsyncServerStreamingCall<TResponse> call = invoker.AsyncServerStreamingCall(method, null, option, request))
{
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
/// <summary>
///
/// </summary>
/// <typeparam name="TRequest"></typeparam>
/// <typeparam name="TResponse"></typeparam>
/// <param name="invoker"></param>
/// <param name="method"></param>
/// <param name="option"></param>
/// <param name="requests"></param>
/// <returns></returns>
private async Task<Tuple<IList<TResponse>, Metadata>> AsyncDuplexStreamingCall<TRequest, TResponse>(CallInvoker invoker, Method<TRequest, TResponse> method, CallOptions option, IEnumerable<TRequest> requests) where TRequest : class where TResponse : class
{
using (AsyncDuplexStreamingCall<TRequest, TResponse> call = invoker.AsyncDuplexStreamingCall(method, null, option))
{
if (requests != null)
{
foreach (TRequest request in requests)
await call.RequestStream.WriteAsync(request).ConfigureAwait(false);
}
await call.RequestStream.CompleteAsync().ConfigureAwait(false);
IList<TResponse> responses = new List<TResponse>();
while (await call.ResponseStream.MoveNext().ConfigureAwait(false))
responses.Add(call.ResponseStream.Current);
return Tuple.Create(responses, await call.ResponseHeadersAsync);
}
}
}
MethodDescriptorClient Get in class Reuqest In the request parameters of , Currently only supported StringContent (httpClient POST Request to the back end , What we get is ByteArrayContent)
Therefore, the calling method of the client is as follows :
using (HttpClient httpClient = new HttpClient())
{
var queryurl = queryUrl();
if (queryurl.EndsWith("/") == false)
queryurl += "/";
//var content = new MultipartFormDataContent();//DateTime.Now.Ticks.ToString("X")
//content.Add(new StringContent(" no like %'1.402.03.01674'%"), "WhereString");
var postObj = new { WhereString = " no like %'1.402.03.01674'%" };
StringContent content = new StringContent(JsonConvert.SerializeObject(postObj), Encoding.UTF8, "application/json");
var postResultTask = httpClient.PostAsync(string.Concat(queryurl, usegrpc ? "grpc" : "api", "/", "Supplier", "/", "QuerySupplier"), content);
var responseMessage = postResultTask.GetAwaiter().GetResult();
this.textBox1.Text = JsonConvert.SerializeObject(responseMessage.Content.ReadAsStringAsync().GetAwaiter().GetResult());
}
The gateway is registered as follows
public void ConfigureServices(IServiceCollection services)
{
services.AddOcelot().AddOcelotGrpc();
}
The gateway service needs to be placed protoc Program . The second directory is for proto file
Git Address :Sam/Ocelot.Provider.RPC
版权声明
本文为[Tassel 1990]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/04/202204231359253190.html
边栏推荐
- Pseudo Distributed installation spark
- 【解决报错】Error in v-on handler: “TypeError: Cannot read property ‘resetFields’ of undefined”
- Get the column name list of the table quickly in Oracle
- 磁盘管理与文件系统
- PyMySQL
- Phpstudy V8, a commonly used software for station construction 1 graphic installation tutorial (Windows version) super detailed
- 计组 | 【七 输入/输出系统】知识点与例题
- MySql主从复制
- Production environment——
- Change the password after installing MySQL in Linux
猜你喜欢
Pycham connects to the remote server and realizes remote debugging
Selenium IDE and XPath installation of chrome plug-in
Mock test
Detailed explanation of the penetration of network security in the shooting range
【PIMF】OpenHarmony啃论文俱乐部—在ACM Survey闲逛是什么体验
vim编辑器的实时操作
[pimf] openharmony paper Club - what is the experience of wandering in ACM survey
Smart doc + Torna generate interface document
TypeError: set_ figure_ params() got an unexpected keyword argument ‘figsize‘
SQL database
随机推荐
无线鹅颈麦主播麦手持麦无线麦克风方案应当如何选择
Sub database and sub table & shardingsphere
Phpstudy V8, a commonly used software for station construction 1 graphic installation tutorial (Windows version) super detailed
Rtklib 2.4.3 source code Notes
杂文 谈谈古典的《拆掉思维里的墙》
Smart doc + Torna generate interface document
Website_ Collection
websocket
英语 | Day15、16 x 句句真研每日一句(从句断开、修饰)
VLAN advanced technology, VLAN aggregation, super VLAN, sub VLAN
Creation of RAID disk array and RAID5
Custom implementation of Baidu image recognition (instead of aipocr)
拷贝构造函数 浅拷贝与深拷贝
JMeter installation tutorial and solutions to the problems I encountered
[pimf] openharmony paper Club - what is the experience of wandering in ACM survey
Solution of garbled code on idea console
Mock test
安装及管理程序
蓝桥杯省一之路06——第十二届省赛真题第二场
SQL: How to parse Microsoft Transact-SQL Statements in C# and to match the column aliases of a view