コードファースト ASP.NET Core gRPC ストリーミング編

はじめに

先日、WCF から gRPC に移行するときの手段として、.proto ファイルを書くのではなく、protobuf-net.Grpc を使って C# のコードから gRPC のクライアントとサーバーを動的に生成する方法を紹介した。コードファースト ASP.NET Core gRPC と命名

tnakamura.hatenablog.com

試したのは 1 リクエストで 1 レスポンスを返す、gRPC でいうところの Unary 方式。 WCF には使いにくいけど Streaming がある。 マジ使いにくいけど。 一方で、gRPC には Client Streaming、Server Streaming、Duplex Streaming がある。 コードファーストで gRPC をやる場合に、Streaming はどうやるのか試してみた。

サービスコントラクトとデータコントラクト

.NET Core ライブラリプロジェクトで、WCF のときみたいにサービスコントラクトとデータコントラクトを定義する。 Streaming は IAsyncEnumerable<T> で表現。 Client Streaming は引数、Server Streaming は返り値、Duplex Streaming はその両方で型が IAsyncEnumerable<T> になる。

using System.Collections.Generic;
using System.Runtime.Serialization;
using System.ServiceModel;
using System.Threading.Tasks;

namespace CodeFirstGrpc.Shared
{
    [ServiceContract]
    public interface IGreetingService
    {
        ValueTask<HelloReply> HelloAsync(HelloRequest request);

        IAsyncEnumerable<HelloReply> ServerStreamingHelloAsync(HelloRequest requests);

        IAsyncEnumerable<HelloReply> DuplexStreamingHelloAsync(IAsyncEnumerable<HelloRequest> requests);
    }

    [DataContract]
    public class HelloRequest
    {
        [DataMember(Order = 1)]
        public string Name { get; set; }
    }

    [DataContract]
    public class HelloReply
    {
        [DataMember(Order = 1)]
        public string Message { get; set; }
    }
}

gRPC サービスの実装

async なメソッド内での yield returnawait foreach を使って、Server Streaming と Duplex Streaming を実装。 C# らしい書き味で筋が良いと感じる。

using System;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
using CodeFirstGrpc.Shared;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using ProtoBuf.Grpc.Server;

namespace CodeFirstGrpc
{
    public class Program
    {
        public static void Main(string[] args)
        {
            CreateHostBuilder(args).Build().Run();
        }

        public static IHostBuilder CreateHostBuilder(string[] args) =>
            Host.CreateDefaultBuilder(args)
                .ConfigureWebHostDefaults(webBuilder =>
                {
                    webBuilder.UseStartup<Startup>();
                });
    }

    public class Startup
    {
        public void ConfigureServices(IServiceCollection services)
        {
            services.AddCodeFirstGrpc();
        }

        public void Configure(IApplicationBuilder app, IWebHostEnvironment env)
        {
            if (env.IsDevelopment())
            {
                app.UseDeveloperExceptionPage();
            }

            app.UseRouting();

            app.UseEndpoints(endpoints =>
            {
                endpoints.MapGrpcService<GreetingService>();
            });
        }
    }

    public class GreetingService : IGreetingService
    {

        public ValueTask<HelloReply> HelloAsync(HelloRequest request)
        {
            return new ValueTask<HelloReply>(new HelloReply
            {
                Message = $"Hello, {request.Name}.",
            });
        }

        public IAsyncEnumerable<HelloReply> ServerStreamingHelloAsync(HelloRequest request) =>
            ServerStreamHelloAsyncImpl(request, default);

        async IAsyncEnumerable<HelloReply> ServerStreamHelloAsyncImpl(
            HelloRequest request,
            [EnumeratorCancellation] CancellationToken cancellationToken)
        {
            for (var i = 0; i < 5; i++)
            {
                if (!cancellationToken.IsCancellationRequested)
                {
                    yield return new HelloReply
                    {
                        Message = $"Hello, {request.Name}.",
                    };
                    await Task.Delay(TimeSpan.FromSeconds(1));
                }
            }
        }

        public IAsyncEnumerable<HelloReply> DuplexStreamingHelloAsync(IAsyncEnumerable<HelloRequest> requests) =>
            DuplexStreamHelloAsyncImpl(requests, default);

        async IAsyncEnumerable<HelloReply> DuplexStreamHelloAsyncImpl(
            IAsyncEnumerable<HelloRequest> requests,
            [EnumeratorCancellation] CancellationToken cancellationToken)
        {
            await foreach (var request in requests)
            {
                await Task.Delay(TimeSpan.FromSeconds(1));
                if (!cancellationToken.IsCancellationRequested)
                {
                    yield return new HelloReply
                    {
                        Message = $"Hello, {request.Name}.",
                    };
                }
            }
        }
    }
}

gRPC クライアントの実装

Server Streaming を await foreach で処理できるのが実に自然。 Duplex Streaming の Client から Server に送る方、つまりは Client Streaming だけど、 IAsyncEnumerable<T> を作って渡すところが馴れない。 自分が IAsyncEnumerable<T> を触り始めたばかりなのもあるが、 もうちょっと上手く書けないか試行錯誤の余地がありそう。

using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using CodeFirstGrpc.Shared;
using Grpc.Net.Client;
using ProtoBuf.Grpc.Client;

namespace CodeFirstGrpc.Client
{
    class Program
    {
        static async Task Main(string[] args)
        {
            var channel = GrpcChannel.ForAddress("https://localhost:5001");
            var client = channel.CreateGrpcService<IGreetingService>();

            Console.WriteLine("=== Unary Test ===");
            var reply = await client.HelloAsync(new HelloRequest
            {
                Name = "Kubo"
            });
            Console.WriteLine(reply.Message);

            Console.WriteLine("=== Server Streaming Test ===");
            try
            {
                await foreach (var x in client.ServerStreamingHelloAsync(new HelloRequest { Name = "Minamino" }))
                {
                    Console.WriteLine(x.Message);
                }

            }
            catch (TaskCanceledException) { }

            Console.WriteLine("=== Duplex Streaming Test ===");
            try
            {
                await foreach (var x in client.DuplexStreamingHelloAsync(GenerateRequests()))
                {
                    Console.WriteLine(x.Message);
                }

            }
            catch (TaskCanceledException) { }

            Console.ReadLine();
        }

        static async IAsyncEnumerable<HelloRequest> GenerateRequests()
        {
            yield return new HelloRequest { Name = "Douan" };
            await Task.Delay(TimeSpan.FromSeconds(1));
            yield return new HelloRequest { Name = "Osako" };
            await Task.Delay(TimeSpan.FromSeconds(1));
            yield return new HelloRequest { Name = "Nakajima" };
            await Task.Delay(TimeSpan.FromSeconds(1));
            yield return new HelloRequest { Name = "Sibasaki" };
            await Task.Delay(TimeSpan.FromSeconds(1));
            yield return new HelloRequest { Name = "Tomiyasu" };
        }
    }
}

実行

Streaming の様子がわかるように、アニメーション GIF を作成してみた。

f:id:griefworker:20191217115009g:plain

まとめ

protobuf-net.Grpc は、gRPC の Client Streaming・Server Streaming・Duplex Streaming を、 IAsyncEnumerable<T> を使って表現していて、実に C# らしくて筋が良いと感じた。

Unary だけでなく Streaming が思いのほか簡単に実装できてしまい、 WCF から gRPC に移行するための道筋がだいぶ見えてきた。 あとはヘッダーで認証に関する情報をやり取りできるかどうかを試せば、 調査終了にできそうだ。