pmerienne/trident-ml

语言: Java

git: https://github.com/pmerienne/trident-ml

Trident-ML:实时在线机器学习库
Trident-ML : A realtime online machine learning library
README.md (中文)

Trident-ML是一个实时在线机器学习库。它允许您使用可扩展的在线算法构建实时预测功能。 该库建立在Storm之上,Storm是一个分布式流处理框架,在一组机器上运行并支持水平扩展。 打包的算法旨在适应有限的内存和处理时间,但它们不能以分布式方式工作。

Trident-ML目前支持:  线性分类(Perceptron,Passive-Aggressive,Winnow,AROW)  线性回归(Perceptron,Passive-Aggressive)  聚类(KMeans)  特征缩放(标准化,规范化)  文字特征提取  流统计(均值,方差) *预训练的Twitter情绪分类器

API概述

Trident-ML基于Trident,这是一种用于实时计算的高级抽象。 如果您熟悉Pig或Cascading等高级批处理工具,Trident的概念将非常熟悉。

建议阅读Storm和Trident文档。

创建实例

Trident-ML处理由无限的Instance或TextInstance集合实现的无限数据流。 创建实例是构建预测工具的第一步。 Trident-ML提供Trident函数将Trident元组转换为实例:

  • 使用InstanceCreator创建实例
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples with 2 random features (named x0 and x1) and an associated boolean label (named label)
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Transform trident tuple to instance
  .each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance"));
  • 使用TextInstanceCreator创建TextInstance
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples containing text and associated label (topic)
  .newStream("reuters", new ReutersBatchSpout())

  // Convert trident tuple to text instance
  .each(new Fields("label", "text"), new TextInstanceCreator<Integer>(), new Fields("instance"));

监督分类

Trident-ML包括不同的算法来进行监督分类:  PerceptronClassifier 基于平均的基于内核的感知器实现二进制分类器。  WinnowClassifier 实现Winnow算法。 当许多维度无关紧要时,它可以很好地扩展到高维数据,并且比感知器表现更好。  BWinnowClassifier  是Balanced Winnow算法的实现 原始Winnow算法的扩展。  AROWClassifier 是一种简单而有效的权重自适应正则化实现。 它结合了几个有用的属性:大保证金培训,置信度加权和处理不可分离数据的能力。  PAClassifier 实现Passive-Aggressive二进制分类器 基于边缘的学习算法。  MultiClassPAClassifier Passive-Aggressive执行one-all-all多类分类的变体。

这些分类器从标记为Instance的数据流中学习 使用ClassifierUpdater。 可以使用ClassifyQuery对未标记实例的另一个数据流进行分类。

以下示例了解NAND功能并对来自DRPC流的实例进行分类。

TridentTopology toppology = new TridentTopology();

// Create perceptron state from labeled instances stream
TridentState perceptronModel = toppology
  // Emit tuple with a labeled instance of enhanced NAND features
  // i.e. : {label=true, features=[1.0 0.0 1.0]} or {label=false, features=[1.0 1.0 1.0]}  
  .newStream("nandsamples", new NANDSpout())

  // Update perceptron
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClassifierUpdater<Boolean>("perceptron", new PerceptronClassifier()));

// Classify instance from a DRPC stream
toppology.newDRPCStream("predict", localDRPC)
  // Transform DRPC ARGS to unlabeled instance
  .each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance"))

  // Classify instance using perceptron state
  .stateQuery(perceptronModel, new Fields("instance"), new ClassifyQuery<Boolean>("perceptron"), new Fields("prediction"));

Trident-ML提供KLDClassifier 它使用Kullback-Leibler距离实现文本分类器。

以下是使用路透社数据集构建新闻分类器的代码:

TridentTopology toppology = new TridentTopology();

// Create KLD classifier state from labeled instances stream
TridentState classifierState = toppology
  // Emit tuples containing text and associated label (topic)
  .newStream("reuters", new ReutersBatchSpout())

  // Convert trident tuple to text instance
  .each(new Fields("label", "text"), new TextInstanceCreator<Integer>(), new Fields("instance"))

  // Update classifier
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new TextClassifierUpdater("newsClassifier", new KLDClassifier(9)));

// Classification stream
toppology.newDRPCStream("classify", localDRPC)

  // Convert DRPC args to text instance
  .each(new Fields("args"), new TextInstanceCreator<Integer>(false), new Fields("instance"))

  // Query classifier with text instance
  .stateQuery(classifierState, new Fields("instance"), new ClassifyTextQuery("newsClassifier"), new Fields("prediction"));

无监督分类

KMEANS 是众所周知的k-means算法的实现 它将实例分区为集群。

使用ClusterUpdater 或者是ClusterQuery 分别udpate集群或查询clusterer:

TridentTopology toppology = new TridentTopology();

// Training stream
TridentState kmeansState = toppology
  // Emit tuples with a instance containing an integer as label and 3 double features named (x0, x1 and x2)
  .newStream("samples", new RandomFeaturesForClusteringSpout())

  // Convert trident tuple to instance
  .each(new Fields("label", "x0", "x1", "x2"), new InstanceCreator<Integer>(), new Fields("instance"))

  // Update a 3 classes kmeans
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClusterUpdater("kmeans", new KMeans(3)));

// Cluster stream
toppology.newDRPCStream("predict", localDRPC)
  // Convert DRPC args to instance
  .each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance"))

  // Query kmeans to classify instance
  .stateQuery(kmeansState, new Fields("instance"), new ClusterQuery("kmeans"), new Fields("prediction"));

流统计

使用Trident-ML可以轻松计算流量统计量,例如平均值,标准偏差和计数。 这些统计信息存储在StreamStatistics对象中。 使用StreamStatisticsUpdater和StreamStatisticsQuery分别执行统计信息更新和查询:

TridentTopology toppology = new TridentTopology();

// Update stream statistics
TridentState streamStatisticsState = toppology
  // emit tuples with random features
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Transform trident tuple to instance
  .each(new Fields("x0", "x1"), new InstanceCreator(), new Fields("instance"))

  // Update stream statistics
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new StreamStatisticsUpdater("randomFeaturesStream", StreamStatistics.fixed()));

// Query stream statistics (with DRPC)
toppology.newDRPCStream("queryStats", localDRPC)
  // Query stream statistics
  .stateQuery(streamStatisticsState, new StreamStatisticsQuery("randomFeaturesStream"), new Fields("streamStats"));

请注意,Trident-ML可以滑动窗口方式支持概念漂移。 使用StreamStatistics#adaptive(maxSize)而不是StreamStatistics#fixed()来构造具有maxSize长度窗口的StreamStatistics实现。

预处理数据

数据预处理是数据挖掘过程中的重要一步。 Trident-ML提供Trident函数将原始特征转换为更适合机器学习算法的表示。

  • Normalizer将单个实例扩展为具有单位规范。
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples with 2 random features (named x0 and x1) and an associated boolean label (named label)
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Convert trident tuple to instance
  .each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance"))

  // Scales features to unit norm
  .each(new Fields("instance"), new Normalizer(), new Fields("scaledInstance"));
  • StandardScaler将原始特征转换为标准的正态分布数据(具有零均值和单位方差的高斯分布)。它使用流统计信息来删除均值和按比例缩放。
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples with 2 random features (named x0 and x1) and an associated boolean label (named label)
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Convert trident tuple to instance
  .each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance"))

  // Update stream statistics
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new StreamStatisticsUpdater("streamStats", new StreamStatistics()), new Fields("instance", "streamStats")).newValuesStream()

  // Standardize stream using original stream statistics
  .each(new Fields("instance", "streamStats"), new StandardScaler(), new Fields("scaledInstance"));

预先训练的分类器

Trident-ML包括预先训练的推特情绪分类器。 它建立在Niek Sanders的Twitter Sentiment Corpus子集上,具有多类PA分类器,并将原始推文分类为正(真)或负(假)。 此分类器实现为三叉戟功能,可以在三叉戟拓扑中轻松使用:

TridentTopology toppology = new TridentTopology();

// Classification stream
toppology.newDRPCStream("classify", localDRPC)
  // Query classifier with text instance
  .each(new Fields("args"), new TwitterSentimentClassifier(), new Fields("sentiment"));

Maven集成:

Trident-Ml托管在Clojars(Maven存储库)上。 要在项目中包含Trident-ML,请将以下内容添加到pom.xml中:  ```XML clojars.org http://clojars.org/repo

com.github.pmerienne 三叉戟毫升 0.0.4  ```

trident-ml是否支持分布式学习?

Storm允许trident-ml以分布式方式处理批量元组(批量将在几个节点之间计算)。这意味着trident-ml可以随工作量水平扩展。

但是,Storm会阻止状态更新同时追加,并且模型学习是在状态更新中完成的。这就是为什么学习步骤无法分发的原因。值得庆幸的是,缺乏并行化并不是真正的瓶颈,因为增量算法非常快(而且简单!)。

分布式算法不会以trident-ml实现,整个设计可以防止这种情况发生。

因此,您无法实现分布式学习,但您仍然可以对流进行分区,以便以分布式方式预处理/丰富数据。

版权和许可

版权所有2013-2015 Pierre Merienne

根据Apache许可证2.0版(“许可证”)获得许可; 除非符合许可,否则您不得使用此文件。 您可以在以下位置获取许可证副本

HTTP://呜呜呜.Apache.org/licenses/license-2.0

除非适用法律要求或书面同意,否则软件 根据许可证分发的“按现状”分发, 不附带任何明示或暗示的保证或条件。 有关管理权限的特定语言,请参阅许可证 许可证下的限制。

本文使用googletrans自动翻译,仅供参考, 原文来自github.com

en_README.md

Trident-ML is a realtime online machine learning library. It allows you to build real time predictive features using scalable online algorithms.
This library is built on top of Storm, a distributed stream processing framework which runs on a cluster of machines and supports horizontal scaling.
The packaged algorithms are designed to fit into limited memory and processing time but they don't work in a distributed way.

Trident-ML currently supports :
Linear classification (Perceptron, Passive-Aggressive, Winnow, AROW)
Linear regression (Perceptron, Passive-Aggressive)
Clustering (KMeans)
Feature scaling (standardization, normalization)
Text feature extraction
Stream statistics (mean, variance)
* Pre-Trained Twitter sentiment classifier

API Overview

Trident-ML is based on Trident, a high-level abstraction for doing realtime computing.
If you're familiar with high level batch processing tools like Pig or Cascading, the concepts of Trident will be very familiar.

It's recommended to read the Storm and Trident documentation.

Create instances

Trident-ML process unbounded streams of data implemented by an infinite collection of Instance or TextInstance.
Creating instances is the first step to build a prediction tools.
Trident-ML offers Trident functions to convert Trident tuples to instances :

TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples with 2 random features (named x0 and x1) and an associated boolean label (named label)
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Transform trident tuple to instance
  .each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance"));
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples containing text and associated label (topic)
  .newStream("reuters", new ReutersBatchSpout())

  // Convert trident tuple to text instance
  .each(new Fields("label", "text"), new TextInstanceCreator<Integer>(), new Fields("instance"));

Supervised classification

Trident-ML includes differents algorithms to do supervised classification :
PerceptronClassifier
implements a binary classifier based on an averaged kernel-based perceptron.
WinnowClassifier
implements Winnow algorithm.
It scales well to high-dimensional data and performs better than a perceptron when many dimensions are irrelevant.
BWinnowClassifier
is an implementation of the Balanced Winnow algorithm
an extension of the original Winnow algorithm.
AROWClassifier
is an simple and efficient implementation of Adaptive Regularization of Weights.
It combines several useful properties : large margin training, confidence weighting, and the capacity to handle non-separable data.
PAClassifier
implements the Passive-Aggressive binary classifier
a margin based learning algorithm.
MultiClassPAClassifier
a variant of the Passive-Aggressive performing one-vs-all multiclass classification.

Theses classifiers learn from a datastream of labeled Instance
using a ClassifierUpdater.
Another datastream of unlabeled instance can be classified with a ClassifyQuery.

The following example learn a NAND function and classify instances comming from a DRPC stream.

TridentTopology toppology = new TridentTopology();

// Create perceptron state from labeled instances stream
TridentState perceptronModel = toppology
  // Emit tuple with a labeled instance of enhanced NAND features
  // i.e. : {label=true, features=[1.0 0.0 1.0]} or {label=false, features=[1.0 1.0 1.0]}  
  .newStream("nandsamples", new NANDSpout())

  // Update perceptron
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClassifierUpdater<Boolean>("perceptron", new PerceptronClassifier()));

// Classify instance from a DRPC stream
toppology.newDRPCStream("predict", localDRPC)
  // Transform DRPC ARGS to unlabeled instance
  .each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance"))

  // Classify instance using perceptron state
  .stateQuery(perceptronModel, new Fields("instance"), new ClassifyQuery<Boolean>("perceptron"), new Fields("prediction"));

Trident-ML provides the KLDClassifier
which implements a text classifier using the Kullback-Leibler Distance.

Here's the code to build a news classifier using Reuters dataset :

TridentTopology toppology = new TridentTopology();

// Create KLD classifier state from labeled instances stream
TridentState classifierState = toppology
  // Emit tuples containing text and associated label (topic)
  .newStream("reuters", new ReutersBatchSpout())

  // Convert trident tuple to text instance
  .each(new Fields("label", "text"), new TextInstanceCreator<Integer>(), new Fields("instance"))

  // Update classifier
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new TextClassifierUpdater("newsClassifier", new KLDClassifier(9)));

// Classification stream
toppology.newDRPCStream("classify", localDRPC)

  // Convert DRPC args to text instance
  .each(new Fields("args"), new TextInstanceCreator<Integer>(false), new Fields("instance"))

  // Query classifier with text instance
  .stateQuery(classifierState, new Fields("instance"), new ClassifyTextQuery("newsClassifier"), new Fields("prediction"));

Unsupervised classification

KMeans
is an implementation of the well known k-means algorithm
which partitions instances into clusters.

Use a ClusterUpdater
or a ClusterQuery
to respectively udpate clusters or query the clusterer :

TridentTopology toppology = new TridentTopology();

// Training stream
TridentState kmeansState = toppology
  // Emit tuples with a instance containing an integer as label and 3 double features named (x0, x1 and x2)
  .newStream("samples", new RandomFeaturesForClusteringSpout())

  // Convert trident tuple to instance
  .each(new Fields("label", "x0", "x1", "x2"), new InstanceCreator<Integer>(), new Fields("instance"))

  // Update a 3 classes kmeans
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new ClusterUpdater("kmeans", new KMeans(3)));

// Cluster stream
toppology.newDRPCStream("predict", localDRPC)
  // Convert DRPC args to instance
  .each(new Fields("args"), new DRPCArgsToInstance(), new Fields("instance"))

  // Query kmeans to classify instance
  .stateQuery(kmeansState, new Fields("instance"), new ClusterQuery("kmeans"), new Fields("prediction"));

Stream statistics

Stream statistics such as mean, standard deviation and count can be easily computed using Trident-ML.
Theses statistics are stored in a StreamStatistics object.
Statistics update and query are performed respectively using a StreamStatisticsUpdater and a StreamStatisticsQuery :

TridentTopology toppology = new TridentTopology();

// Update stream statistics
TridentState streamStatisticsState = toppology
  // emit tuples with random features
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Transform trident tuple to instance
  .each(new Fields("x0", "x1"), new InstanceCreator(), new Fields("instance"))

  // Update stream statistics
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new StreamStatisticsUpdater("randomFeaturesStream", StreamStatistics.fixed()));

// Query stream statistics (with DRPC)
toppology.newDRPCStream("queryStats", localDRPC)
  // Query stream statistics
  .stateQuery(streamStatisticsState, new StreamStatisticsQuery("randomFeaturesStream"), new Fields("streamStats"));

Note that Trident-ML can suppport concept drift in a sliding window manner.
Use StreamStatistics#adaptive(maxSize) instead of StreamStatistics#fixed() to construct StreamStatistics implementation with a maxSize length window.

Preprocessing data

Data preprocessing is an important step in the data mining process.
Trident-ML provides Trident functions to transform raw features into a representation that is more suitable for machine learning algorithms.

  • Normalizer scales individual instances to have unit norm.
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples with 2 random features (named x0 and x1) and an associated boolean label (named label)
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Convert trident tuple to instance
  .each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance"))

  // Scales features to unit norm
  .each(new Fields("instance"), new Normalizer(), new Fields("scaledInstance"));
  • StandardScaler transform raw features to standard normally distributed data (Gaussian with zero mean and unit variance). It uses Stream Statistics to remove mean and scale to variance.
TridentTopology toppology = new TridentTopology();

toppology
  // Emit tuples with 2 random features (named x0 and x1) and an associated boolean label (named label)
  .newStream("randomFeatures", new RandomFeaturesSpout())

  // Convert trident tuple to instance
  .each(new Fields("label", "x0", "x1"), new InstanceCreator<Boolean>(), new Fields("instance"))

  // Update stream statistics
  .partitionPersist(new MemoryMapState.Factory(), new Fields("instance"), new StreamStatisticsUpdater("streamStats", new StreamStatistics()), new Fields("instance", "streamStats")).newValuesStream()

  // Standardize stream using original stream statistics
  .each(new Fields("instance", "streamStats"), new StandardScaler(), new Fields("scaledInstance"));

Pre-trained classifier

Trident-ML includes a pre-trained twitter sentiment classifier.
It was built on a subset of the Twitter Sentiment Corpus by Niek Sanders with a multi class PA classifier and classifies raw tweets as positive (true) or negative (false).
This classifier is implemented as a trident function and can be easily used in a trident topology :

TridentTopology toppology = new TridentTopology();

// Classification stream
toppology.newDRPCStream("classify", localDRPC)
  // Query classifier with text instance
  .each(new Fields("args"), new TwitterSentimentClassifier(), new Fields("sentiment"));

Maven integration :

Trident-Ml is hosted on Clojars (a Maven repository).
To include Trident-ML in your project , add the following to your pom.xml: :
```xml


clojars.org
http://clojars.org/repo


com.github.pmerienne
trident-ml
0.0.4

```

Does trident-ml support distributed learning?

Storm allows trident-ml to process batches of tuples in a distributed way (batches will be computed among several nodes). This means that trident-ml can scale horizontally with workload.

However Storm prevents state updates to append simultaneously and the model learning is done in a state update. That's why, the learning step can't be distributed. Thankfully this lack of parallelization isn't a real bottle neck because the incremental algorithms are very fast (and simple!).

Distributed algorithms will not be implemented in trident-ml, the whole design prevents this.

So you can't achieve distributed learning however but you can still partition the streams to pre-process/enrich your data in a distributed manner.

Copyright and license

Copyright 2013-2015 Pierre Merienne

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.