はじめに
先日、WCF から gRPC に移行するときの手段として、.proto ファイルを書くのではなく、protobuf-net.Grpc を使って C# のコードから gRPC のクライアントとサーバーを動的に生成する方法を紹介した。コードファースト ASP.NET Core gRPC と命名。
tnakamura.hatenablog.com
試したのは 1 リクエストで 1 レスポンスを返す、gRPC でいうところの Unary 方式。
WCF には使いにくいけど Streaming がある。
マジ使いにくいけど。
一方で、gRPC には Client Streaming、Server Streaming、Duplex Streaming がある。
コードファーストで gRPC をやる場合に、Streaming はどうやるのか試してみた。
.NET Core ライブラリプロジェクトで、WCF のときみたいにサービスコントラクトとデータコントラクトを定義する。
Streaming は IAsyncEnumerable<T>
で表現。
Client Streaming は引数、Server Streaming は返り値、Duplex Streaming はその両方で型が IAsyncEnumerable<T>
になる。
using System.Collections.Generic;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Threading.Tasks;
namespace CodeFirstGrpc.Shared
{
[ServiceContract]
public interface IGreetingService
{
ValueTask<HelloReply> HelloAsync(HelloRequest request);
IAsyncEnumerable<HelloReply> ServerStreamingHelloAsync(HelloRequest requests);
IAsyncEnumerable<HelloReply> DuplexStreamingHelloAsync(IAsyncEnumerable<HelloRequest> requests);
}
[DataContract]
public class HelloRequest
{
[DataMember(Order = 1)]
public string Name { get; set; }
}
[DataContract]
public class HelloReply
{
[DataMember(Order = 1)]
public string Message { get; set; }
}
}
gRPC サービスの実装
async なメソッド内での yield return
や await foreach
を使って、Server Streaming と Duplex Streaming を実装。
C# らしい書き味で筋が良いと感じる。
using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using CodeFirstGrpc.Shared;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using ProtoBuf.Grpc.Server;
namespace CodeFirstGrpc
{
public class Program
{
public static void Main(string[] args)
{
CreateHostBuilder(args).Build().Run();
}
public static IHostBuilder CreateHostBuilder(string[] args) =>
Host.CreateDefaultBuilder(args)
.ConfigureWebHostDefaults(webBuilder =>
{
webBuilder.UseStartup<Startup>();
});
}
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
services.AddCodeFirstGrpc();
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapGrpcService<GreetingService>();
});
}
}
public class GreetingService : IGreetingService
{
public ValueTask<HelloReply> HelloAsync(HelloRequest request)
{
return new ValueTask<HelloReply>(new HelloReply
{
Message = $"Hello, {request.Name}.",
});
}
public IAsyncEnumerable<HelloReply> ServerStreamingHelloAsync(HelloRequest request) =>
ServerStreamHelloAsyncImpl(request, default);
async IAsyncEnumerable<HelloReply> ServerStreamHelloAsyncImpl(
HelloRequest request,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
for (var i = 0; i < 5; i++)
{
if (!cancellationToken.IsCancellationRequested)
{
yield return new HelloReply
{
Message = $"Hello, {request.Name}.",
};
await Task.Delay(TimeSpan.FromSeconds(1));
}
}
}
public IAsyncEnumerable<HelloReply> DuplexStreamingHelloAsync(IAsyncEnumerable<HelloRequest> requests) =>
DuplexStreamHelloAsyncImpl(requests, default);
async IAsyncEnumerable<HelloReply> DuplexStreamHelloAsyncImpl(
IAsyncEnumerable<HelloRequest> requests,
[EnumeratorCancellation] CancellationToken cancellationToken)
{
await foreach (var request in requests)
{
await Task.Delay(TimeSpan.FromSeconds(1));
if (!cancellationToken.IsCancellationRequested)
{
yield return new HelloReply
{
Message = $"Hello, {request.Name}.",
};
}
}
}
}
}
gRPC クライアントの実装
Server Streaming を await foreach
で処理できるのが実に自然。
Duplex Streaming の Client から Server に送る方、つまりは Client Streaming だけど、
IAsyncEnumerable<T>
を作って渡すところが馴れない。
自分が IAsyncEnumerable<T>
を触り始めたばかりなのもあるが、
もうちょっと上手く書けないか試行錯誤の余地がありそう。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CodeFirstGrpc.Shared;
using Grpc.Net.Client;
using ProtoBuf.Grpc.Client;
namespace CodeFirstGrpc.Client
{
class Program
{
static async Task Main(string[] args)
{
var channel = GrpcChannel.ForAddress("https://localhost:5001");
var client = channel.CreateGrpcService<IGreetingService>();
Console.WriteLine("=== Unary Test ===");
var reply = await client.HelloAsync(new HelloRequest
{
Name = "Kubo"
});
Console.WriteLine(reply.Message);
Console.WriteLine("=== Server Streaming Test ===");
try
{
await foreach (var x in client.ServerStreamingHelloAsync(new HelloRequest { Name = "Minamino" }))
{
Console.WriteLine(x.Message);
}
}
catch (TaskCanceledException) { }
Console.WriteLine("=== Duplex Streaming Test ===");
try
{
await foreach (var x in client.DuplexStreamingHelloAsync(GenerateRequests()))
{
Console.WriteLine(x.Message);
}
}
catch (TaskCanceledException) { }
Console.ReadLine();
}
static async IAsyncEnumerable<HelloRequest> GenerateRequests()
{
yield return new HelloRequest { Name = "Douan" };
await Task.Delay(TimeSpan.FromSeconds(1));
yield return new HelloRequest { Name = "Osako" };
await Task.Delay(TimeSpan.FromSeconds(1));
yield return new HelloRequest { Name = "Nakajima" };
await Task.Delay(TimeSpan.FromSeconds(1));
yield return new HelloRequest { Name = "Sibasaki" };
await Task.Delay(TimeSpan.FromSeconds(1));
yield return new HelloRequest { Name = "Tomiyasu" };
}
}
}
実行
Streaming の様子がわかるように、アニメーション GIF を作成してみた。
まとめ
protobuf-net.Grpc は、gRPC の Client Streaming・Server Streaming・Duplex Streaming を、
IAsyncEnumerable<T>
を使って表現していて、実に C# らしくて筋が良いと感じた。
Unary だけでなく Streaming が思いのほか簡単に実装できてしまい、
WCF から gRPC に移行するための道筋がだいぶ見えてきた。
あとはヘッダーで認証に関する情報をやり取りできるかどうかを試せば、
調査終了にできそうだ。