はじめに
先日、WCF から gRPC に移行するときの手段として、.proto ファイルを書くのではなく、protobuf-net.Grpc を使って C# のコードから gRPC のクライアントとサーバーを動的に生成する方法を紹介した。コードファースト ASP.NET Core gRPC と命名。
試したのは 1 リクエストで 1 レスポンスを返す、gRPC でいうところの Unary 方式。 WCF には使いにくいけど Streaming がある。 マジ使いにくいけど。 一方で、gRPC には Client Streaming、Server Streaming、Duplex Streaming がある。 コードファーストで gRPC をやる場合に、Streaming はどうやるのか試してみた。
サービスコントラクトとデータコントラクト
.NET Core ライブラリプロジェクトで、WCF のときみたいにサービスコントラクトとデータコントラクトを定義する。
Streaming は IAsyncEnumerable<T>
で表現。
Client Streaming は引数、Server Streaming は返り値、Duplex Streaming はその両方で型が IAsyncEnumerable<T>
になる。
using System.Collections.Generic; using System.Runtime.Serialization; using System.ServiceModel; using System.Threading.Tasks; namespace CodeFirstGrpc.Shared { [ServiceContract] public interface IGreetingService { ValueTask<HelloReply> HelloAsync(HelloRequest request); IAsyncEnumerable<HelloReply> ServerStreamingHelloAsync(HelloRequest requests); IAsyncEnumerable<HelloReply> DuplexStreamingHelloAsync(IAsyncEnumerable<HelloRequest> requests); } [DataContract] public class HelloRequest { [DataMember(Order = 1)] public string Name { get; set; } } [DataContract] public class HelloReply { [DataMember(Order = 1)] public string Message { get; set; } } }
gRPC サービスの実装
async なメソッド内での yield return
や await foreach
を使って、Server Streaming と Duplex Streaming を実装。
C# らしい書き味で筋が良いと感じる。
using System; using System.Collections.Generic; using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; using CodeFirstGrpc.Shared; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Hosting; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Hosting; using ProtoBuf.Grpc.Server; namespace CodeFirstGrpc { public class Program { public static void Main(string[] args) { CreateHostBuilder(args).Build().Run(); } public static IHostBuilder CreateHostBuilder(string[] args) => Host.CreateDefaultBuilder(args) .ConfigureWebHostDefaults(webBuilder => { webBuilder.UseStartup<Startup>(); }); } public class Startup { public void ConfigureServices(IServiceCollection services) { services.AddCodeFirstGrpc(); } public void Configure(IApplicationBuilder app, IWebHostEnvironment env) { if (env.IsDevelopment()) { app.UseDeveloperExceptionPage(); } app.UseRouting(); app.UseEndpoints(endpoints => { endpoints.MapGrpcService<GreetingService>(); }); } } public class GreetingService : IGreetingService { public ValueTask<HelloReply> HelloAsync(HelloRequest request) { return new ValueTask<HelloReply>(new HelloReply { Message = $"Hello, {request.Name}.", }); } public IAsyncEnumerable<HelloReply> ServerStreamingHelloAsync(HelloRequest request) => ServerStreamHelloAsyncImpl(request, default); async IAsyncEnumerable<HelloReply> ServerStreamHelloAsyncImpl( HelloRequest request, [EnumeratorCancellation] CancellationToken cancellationToken) { for (var i = 0; i < 5; i++) { if (!cancellationToken.IsCancellationRequested) { yield return new HelloReply { Message = $"Hello, {request.Name}.", }; await Task.Delay(TimeSpan.FromSeconds(1)); } } } public IAsyncEnumerable<HelloReply> DuplexStreamingHelloAsync(IAsyncEnumerable<HelloRequest> requests) => DuplexStreamHelloAsyncImpl(requests, default); async IAsyncEnumerable<HelloReply> DuplexStreamHelloAsyncImpl( IAsyncEnumerable<HelloRequest> requests, [EnumeratorCancellation] CancellationToken cancellationToken) { await foreach (var request in requests) { await Task.Delay(TimeSpan.FromSeconds(1)); if (!cancellationToken.IsCancellationRequested) { yield return new HelloReply { Message = $"Hello, {request.Name}.", }; } } } } }
gRPC クライアントの実装
Server Streaming を await foreach
で処理できるのが実に自然。
Duplex Streaming の Client から Server に送る方、つまりは Client Streaming だけど、
IAsyncEnumerable<T>
を作って渡すところが馴れない。
自分が IAsyncEnumerable<T>
を触り始めたばかりなのもあるが、
もうちょっと上手く書けないか試行錯誤の余地がありそう。
using System; using System.Collections.Generic; using System.Threading.Tasks; using CodeFirstGrpc.Shared; using Grpc.Net.Client; using ProtoBuf.Grpc.Client; namespace CodeFirstGrpc.Client { class Program { static async Task Main(string[] args) { var channel = GrpcChannel.ForAddress("https://localhost:5001"); var client = channel.CreateGrpcService<IGreetingService>(); Console.WriteLine("=== Unary Test ==="); var reply = await client.HelloAsync(new HelloRequest { Name = "Kubo" }); Console.WriteLine(reply.Message); Console.WriteLine("=== Server Streaming Test ==="); try { await foreach (var x in client.ServerStreamingHelloAsync(new HelloRequest { Name = "Minamino" })) { Console.WriteLine(x.Message); } } catch (TaskCanceledException) { } Console.WriteLine("=== Duplex Streaming Test ==="); try { await foreach (var x in client.DuplexStreamingHelloAsync(GenerateRequests())) { Console.WriteLine(x.Message); } } catch (TaskCanceledException) { } Console.ReadLine(); } static async IAsyncEnumerable<HelloRequest> GenerateRequests() { yield return new HelloRequest { Name = "Douan" }; await Task.Delay(TimeSpan.FromSeconds(1)); yield return new HelloRequest { Name = "Osako" }; await Task.Delay(TimeSpan.FromSeconds(1)); yield return new HelloRequest { Name = "Nakajima" }; await Task.Delay(TimeSpan.FromSeconds(1)); yield return new HelloRequest { Name = "Sibasaki" }; await Task.Delay(TimeSpan.FromSeconds(1)); yield return new HelloRequest { Name = "Tomiyasu" }; } } }
実行
Streaming の様子がわかるように、アニメーション GIF を作成してみた。
まとめ
protobuf-net.Grpc は、gRPC の Client Streaming・Server Streaming・Duplex Streaming を、
IAsyncEnumerable<T>
を使って表現していて、実に C# らしくて筋が良いと感じた。
Unary だけでなく Streaming が思いのほか簡単に実装できてしまい、 WCF から gRPC に移行するための道筋がだいぶ見えてきた。 あとはヘッダーで認証に関する情報をやり取りできるかどうかを試せば、 調査終了にできそうだ。