Details

Accessing request metadata

By default the generated service interfaces don’t provide access to the request metadata, only to the request body (via the RPC method input parameter). If your methods require access to the request MetadataMetadata, you can configure Akka gRPC to generate server “power APIs” that extend the base service interfaces to provide an additional request metadata parameter to each service method. See the detailed chapters on sbt, Gradle and Maven for how to set this build option. Note that this option doesn’t effect the generated client stubs.

Notice: you need change GreeterServiceHandlerFactory to GreeterServiceHandlerFactoryPowerApiHandlerFactory.

Notice: you need change GreeterServiceHandler to GreeterServicePowerApiHandler.

Here’s an example implementation of these server power APIs:

Scala
package example.myapp.helloworld

import akka.NotUsed
import akka.grpc.scaladsl.Metadata
import akka.stream.Materializer
import akka.stream.scaladsl.{ Sink, Source }
import example.myapp.helloworld.grpc._

import scala.concurrent.Future

class PowerGreeterServiceImpl(materializer: Materializer) extends GreeterServicePowerApi {
  import materializer.executionContext
  private implicit val mat: Materializer = materializer

  override def sayHello(in: HelloRequest, metadata: Metadata): Future[HelloReply] = {
    val greetee = authTaggedName(in, metadata)
    println(s"sayHello to $greetee")
    Future.successful(HelloReply(s"Hello, $greetee"))
  }

  override def itKeepsTalking(in: Source[HelloRequest, NotUsed], metadata: Metadata): Future[HelloReply] = {
    println(s"sayHello to in stream...")
    in.runWith(Sink.seq)
      .map(elements => HelloReply(s"Hello, ${elements.map(authTaggedName(_, metadata)).mkString(", ")}"))
  }

  override def itKeepsReplying(in: HelloRequest, metadata: Metadata): Source[HelloReply, NotUsed] = {
    val greetee = authTaggedName(in, metadata)
    println(s"sayHello to $greetee with stream of chars...")
    Source(s"Hello, $greetee".toList).map(character => HelloReply(character.toString))
  }

  override def streamHellos(in: Source[HelloRequest, NotUsed], metadata: Metadata): Source[HelloReply, NotUsed] = {
    println(s"sayHello to stream...")
    in.map(request => HelloReply(s"Hello, ${authTaggedName(request, metadata)}"))
  }

  // Bare-bones just for GRPC metadata demonstration purposes
  private def isAuthenticated(metadata: Metadata): Boolean =
    metadata.getText("authorization").nonEmpty

  private def authTaggedName(in: HelloRequest, metadata: Metadata): String = {
    val authenticated = isAuthenticated(metadata)
    s"${in.name} (${if (!authenticated) "not " else ""}authenticated)"
  }
}
Java
package example.myapp.helloworld;

import akka.NotUsed;
import akka.grpc.javadsl.Metadata;
import akka.stream.Materializer;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
import example.myapp.helloworld.grpc.GreeterServicePowerApi;
import example.myapp.helloworld.grpc.HelloReply;
import example.myapp.helloworld.grpc.HelloRequest;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.stream.Collectors;

public class GreeterServicePowerApiImpl implements GreeterServicePowerApi {
  private final Materializer mat;

  public GreeterServicePowerApiImpl(Materializer mat) {
   this.mat = mat;
  }

  @Override
  public CompletionStage<HelloReply> sayHello(HelloRequest in, Metadata metadata) {
    String greetee = authTaggedName(in, metadata);
    System.out.println("sayHello to " + greetee);
    HelloReply reply = HelloReply.newBuilder().setMessage("Hello, " + greetee).build();
    return CompletableFuture.completedFuture(reply);
  }

  @Override
  public CompletionStage<HelloReply> itKeepsTalking(Source<HelloRequest, NotUsed> in, Metadata metadata) {
    System.out.println("sayHello to in stream...");
    return in.runWith(Sink.seq(), mat)
      .thenApply(elements -> {
        String elementsStr = elements.stream().map(elem -> authTaggedName(elem, metadata))
            .collect(Collectors.toList()).toString();
        return HelloReply.newBuilder().setMessage("Hello, " + elementsStr).build();
      });
  }

  @Override
  public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in, Metadata metadata) {
    String greetee = authTaggedName(in, metadata);
    System.out.println("sayHello to " + greetee + " with stream of chars");
    List<Character> characters = ("Hello, " + greetee)
        .chars().mapToObj(c -> (char) c).collect(Collectors.toList());
    return Source.from(characters)
      .map(character -> {
        return HelloReply.newBuilder().setMessage(String.valueOf(character)).build();
      });
  }

  @Override
  public Source<HelloReply, NotUsed> streamHellos(Source<HelloRequest, NotUsed> in, Metadata metadata) {
    System.out.println("sayHello to stream...");
    return in.map(request -> HelloReply.newBuilder().setMessage("Hello, " + authTaggedName(request, metadata)).build());
  }

  // Bare-bones just for GRPC metadata demonstration purposes
  private boolean isAuthenticated(Metadata metadata) {
    return metadata.getText("authorization").isPresent();
  }

  private String authTaggedName(HelloRequest in, Metadata metadata) {
    boolean authenticated = isAuthenticated(metadata);
    return String.format("%s (%sauthenticated)", in.getName(), isAuthenticated(metadata) ? "" : "not ");
  }
}

Status codes

To signal an error, you can fail the FutureCompletionStage or Source you are returning with a GrpcServiceExceptionGrpcServiceException containing the status code you want to return.

For an overview of gRPC status codes and their meaning see statuscodes.md.

For unary responses:

Scala
import akka.grpc.GrpcServiceException
import io.grpc.Status

val exceptionMetadata = new MetadataBuilder()
  .addText("test-text", "test-text-data")
  .addBinary("test-binary-bin", ByteString("test-binary-data"))
  .build()

// ...

def sayHello(in: HelloRequest): Future[HelloReply] = {
  if (in.name.isEmpty)
    Future.failed(
      new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found"), exceptionMetadata))
  else
    Future.successful(HelloReply(s"Hi ${in.name}!"))
}
Java
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

import io.grpc.Status;
import akka.grpc.GrpcServiceException;

// ...

@Override
public CompletionStage<HelloReply> sayHello(HelloRequest in) {
    if (in.getName().isEmpty()) {
        CompletableFuture<HelloReply> future = new CompletableFuture<>();
        future.completeExceptionally(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found")));
        return future;
    } else {
        return CompletableFuture.completedFuture(HelloReply.newBuilder().setMessage("Hi, " + in.getName()).build());
    }
}

For streaming responses:

Scala
import akka.NotUsed
import akka.stream.scaladsl.Source

import akka.grpc.GrpcServiceException
import io.grpc.Status

val exceptionMetadata = new MetadataBuilder()
  .addText("test-text", "test-text-data")
  .addBinary("test-binary-bin", ByteString("test-binary-data"))
  .build()

def itKeepsReplying(in: HelloRequest): Source[HelloReply, NotUsed] = {
  if (in.name.isEmpty)
    Source.failed(
      new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found"), exceptionMetadata))
  else
    myResponseSource
}
Java
import akka.NotUsed;
import akka.stream.javadsl.Source;

import io.grpc.Status;
import akka.grpc.GrpcServiceException;

// ...

@Override
public Source<HelloReply, NotUsed> itKeepsReplying(HelloRequest in) {
  if (in.getName().isEmpty()) {
        return Source.failed(new GrpcServiceException(Status.INVALID_ARGUMENT.withDescription("No name found")));
    } else {
        return myResponseSource;
    }
}
Found an error in this documentation? The source code for this page can be found here. Please feel free to edit and contribute a pull request.