亚马逊AWS官方博客

和Netflix一起探索基于DJL的 Java 分布式在线深度学习推理架构

Netflix 是世界上最大流媒体平台提供商。它拥有多个知名影视剧IP例如 《纸牌屋》和《白夜追凶》,同时也拥有超过1.9亿来自全球各地的订阅用户。对于Netflix来说,时刻让用户拥有最佳使用体验是最为重要的。其中一个维持它的秘诀就是使用深度学习模型来提升信息的价值。Netflix在不侵犯用户隐私的前提下,通过收集程序的日志(非用户数据)来分析检查系统的稳定性。具体实现则是利用深度学习模型 + 微服务架构来达到实时分析超大规模的日志的目标。

在这篇文章里,我们将一步一步介绍Netflix Observability团队是如何利用 Deep Java Library (DJL), 一个基于Java的深度学习框架来部署他们的迁移学习模型并达到实时对日志数据分析的。实时集群日志分析可以帮助团队洞悉在运行服务的输出结果,判断异常,从而可以直接从日志层面向运维团队发出警报。通过使用自然语言处理的深度学习模型,他们可以把日志归类到同一个ID下面从而减少日志冗余和存储上的压力。

 

举个例子来说,假设你通过日志发现了问题并想向服务方发出一个警报:

“com.amazonaws.SdkClientException: Unable to execute HTTP request: a231d9e5-c804-4beb-a5cc-ca0d10868d22: Temporary failure in name resolution” 

“SdkClientException: Unable to execute HTTP request: 4a13e174-8f4b-4572-98f8-7958a9330f77”

虽然日志本身内容不尽相同,但是他们都表达了类似的含义。那么,我们便可以将他们归类为同一个集群ID “557e605d3e9000b2”, 这样我们就可以将它们整合在一起变成一个警告。

 

那么具体的归类过程是如何实现的呢?输入是基于日志的字符数列。然后我们把它们输送到基于卷积神经网络 (CNN) 减少随机观察的文字,然后让通过 Universal Sentence Encoder (USE) 来把相关的语义密集矩阵截取出来。当我们有了这些矩阵向量后,我们向它们施加一个 locality-sensitive  哈希 (LSH) 算法将它们制作成一个ID,当然语义的相似度也被保留在了里面。最后通过使用DJL 来运行深度学习模型,然后利用Java产生对应的哈希ID。

 

预训练的 USE 模型是我们在生产环境中使用的归类模型之一。它通过接受输入句子,从而将这些东西转换为高维词向量。随后,用户可以对它们进行相似度分析。与此同时,异常检测或者警报也可以通过词向量信息中截取并自动触发。

 

通过DJL以及迁移学习,开发团队可以在 7ms 内完成对语义的截取,并且将它们归类到同一个集群ID下。上面的数据管道充分释放了迁移学习和全局日志管理的能力。.

生产过程的挑战

最初 Netflix observability 团队使用了基于Python的离线处理模式。可是,离线处理的方式使得警报延迟从而造成问题不能及时获得修复的后果。因此,团队决定重新制作一套在线的系统架构可以实现实时从上百个微服务中分析日志。

 

在制作过程中,团队遇到了很多麻烦:第一,目前的日志流在Netflix中使用了基于 Java/Scala的  Akka streams。Akka streams 是一个动态多线程分布式的 Java架构,如果直接将基于python的代码直接插入到现有的Java架构中会造成通信上的延迟,同时也没有办法达到预期的延迟率。第二,Netflix创建服务时,Java没有 TensorFlow 2.x的支持。第三,如果直接通过调用相关库的C++ API,可能会产生内存溢出等后续维护性的问题。简单来说,一个 Java虚拟机的垃圾回收机制一般是无法观测到C++的内存用量的。因此,Java的GC很难在短时间内将大量的无用内存清理掉。

 

解决方案

Netflix 采用了 DJL来应对Java兼容性以及内存管理的挑战。DJL是一个AWS开源的Java深度学习库,同时支持训练和推理任务。DJL选择建立在现有的深度学习引擎 (TensorFlow, PyTorch, MXNet, etc…)之上,并统一API设计,使得切换引擎时无需改变代码。它内建的模型库体系可以帮助用户更好的管理私有化模型,同时公版模型库提供了多达70个预训练模型。

 

DJL 的 Java API可以无缝衔接在 Akka, Akka streams, 和 Akka-Http (比如Netflix使用的数据流应用) 之上。得益于它的多线程支持,Akka的并行处理能力可以完全发挥出来。更重要的是,DJL通过构建NDManager解决了内存溢出问题。NDManager 可以直接接触到 C++层并且及时处理不用的对象。通过100多个小时的连续压力测试,DJL可以游刃有余的在生产环境中运行推理任务同时也不会出错。同时,Netflix也可以在基本不改变代码的情况下在未来接入更多框架。

 

那么我们通过一个简单推理pipeline来了解一下Netflix是如何利用 Akka-Http实现的深度学习在线架构吧。

 

 

Route

class DJLRoutes(djl: ActorRef[DjlInference.Command])(implicit val system: ActorSystem[_]) {

  // 请求超时设定
  private implicit val timeout = Timeout.create(Duration.ofSeconds(60))

  def getInference(text: InferenceRequest): Future[ActionPerformed] = djl.ask(InferVector(text, _))
  //#all-routes
  val djlRoutes: Route =
    pathPrefix("inferences") {
      concat(
        pathEnd {
          concat(
            post {
              entity(as[InferenceRequest]) { text =>
                onSuccess(getInference(text)) { performed =>
                  complete((StatusCodes.OK, performed))
                }
              }
            })
        },
       )
    }
  //#all-routes
}
 
       

Akka 使用了 Route 来建立RESTful 请求。如上面的代码所示,你可以设定一个 /inferences 端口来接收 POST 请求。在这里,我们调用了一个叫做 getInference 的方法来使 Actor 做推理任务。

Actor

下面是一个简单的 Akka Actor 配置。我们只需要简单的创建一个叫做 inferences 的方法就可以直接调用推理的API了。通过设计这个模块,Akka可以帮助扩张到多个并行的请求在多台机器上。

object DjlInference {
  // actor protocol
  sealed trait Command
  final case class InferVector(text: InferenceRequest, replyTo: ActorRef[ActionPerformed]) extends Command

  final case class ActionPerformed(vector: String)
  final case class InferenceRequest(text: String)

  def apply(): Behavior[Command] = inferences()

  private def inferences(): Behavior[Command] =
    Behaviors.receiveMessage {
      case InferVector(text, replyTo) =>
        val prediciton = predict(Collections.singletonList(text.text))(0).mkString("Array(", ", ", ")")
        replyTo ! ActionPerformed(prediciton)
        Behaviors.same
    }
}

DJL 推理

在定义好两个基本的 Akka 角色后,我们就可以开始实现主要的推理过程了。首先,我们尝试读取一下模型:

private final static ZooModel<String[], float[][]> loadModel() {
    try {
        if (!"TensorFlow".equals(Engine.getInstance().getEngineName())) {
            return null;
        }

        String modelUrl =
                "https://storage.googleapis.com/tfhub-modules/google/universal-sentence-encoder/4.tar.gz";

        Criteria<String[], float[][]> criteria =
                Criteria.builder()
                        .optApplication(Application.NLP.TEXT_EMBEDDING)
                        .setTypes(String[].class, float[][].class)
                        .optModelUrls(modelUrl)
                        .optTranslator(new MyTranslator())
                        .optProgress(new ProgressBar())
                        .build();

        return ModelZoo.loadModel(criteria);
    }catch (Exception ignored){
        return null;
    }
}
 
       

在DJL中读取模型十分简单,你只需要定义一个模型 URL 和对应的 Translator 就可以做前处理和后处理。在那之后,模型就会读入到内存中准备使用。

之后,我们直接创建一个 predict 推理方法,它会直接被 Akka的多线程调用使用。因为 DJL 在 0.8.0 版本的predictor不是线程安全的 (0.9.0开始所有predictor都是线程安全的),所以我们用 predictorHolder, 一个声明为 thread-local 对象来应对多线程。

public static float[][] predict(List<String> inputs)
        throws TranslateException {
    Predictor<String[], float[][]> predictor = predictorHolder.get();
    if (predictor == null) {
        predictor = model.newPredictor();
        predictorHolder.set(predictor);
    }
    return predictor.predict(inputs.toArray(new String[0]));
}

Application

最后,我们来到主要程序。在这边我们只需要把之前定义好的各个模块组合在一起。 首先定义 Actor 和 Route 并连接在一起。然后通过调用 startHttpServer方法来开始运行。

def main(args: Array[String]): Unit = {
//#server-bootstrapping
val rootBehavior = Behaviors.setup[Nothing] { context =>
    val djlInferenceActor = context.spawn(DjlInference(), "DjlInferenceActor")
    context.watch(djlInferenceActor)

    val routes = new DJLRoutes(djlInferenceActor)(context.system)
    startHttpServer(routes.djlRoutes)(context.system)

    Behaviors.empty
}
val system = ActorSystem[Nothing](rootBehavior, "DjlAkkaHttpServer")
//#server-bootstrapping
}

The following code defines the basic IP and port for the server.

private def startHttpServer(routes: Route)(implicit system: ActorSystem[_]): Unit = {
// Akka HTTP still needs a classic ActorSystem to start
import system.executionContext


val futureBinding = Http().newServerAt("localhost", 8080).bind(routes)
futureBinding.onComplete {
    case Success(binding) =>
    val address = binding.localAddress
    system.log.info("Server online at http://{}:{}/", address.getHostString, address.getPort)
    case Failure(ex) =>
    system.log.error("Failed to bind HTTP endpoint, terminating system", ex)
    system.terminate()
}
}

这个项目目前在 GitHub 上,用户可以更改配置轻松完成在计算集群上的分布式在线推理任务。

在现实的生产环境中,Netflix observability 在推理上的耗费时间仅仅只有2ms,包含语义分析以及产生集群ID。通过调整线程数目,以及使用NDManager,团队从而达到了稳定的性能,同时也给流数据处理,数据清理等留下了充足的空间。

 

总结

通过使用DJL,Netflix observability团队成功构建了基于TensorFlow 2.0的流数据深度学习平台。团队未来的目标是尝试DJL的训练 API和迁移学习应用。同时探索 DJL PyTorch和Apache MXNet 模块的性能表现。

 

想了解更多,请参见下面几个链接:

DJL官网: https://djl.ai

知乎专栏:DJL深度学习库

也欢迎加入DJL的 slack论坛 及中文微信群 (参见知乎)。