TensAIR: Real-Time Training of Neural Networks from Data-streams (2024)

Mauro D. L. Tosimauro.dalleluccatosi@uni.lu0000-0002-0218-2413University of LuxembourgLuxembourg,Vinu E. Venugopalvinu.ev@iiitb.ac.in0000-0003-4429-9932IIIT BangaloreIndiaandMartin Theobaldmartin.theobald@uni.lu0000-0003-4067-7609University of LuxembourgLuxembourg

(2024)

Abstract.

Online learning (OL) from data streams is an emerging area of research that encompasses numerous challenges from stream processing, machine learning, and networking. Stream-processing platforms, such as Apache Kafka and Flink, have basic extensions for the training of Artificial Neural Networks (ANNs) in a stream-processing pipeline. However, these extensions were not designed to train ANNs in real-time, and they suffer from performance and scalability issues when doing so.

This paper presents TensAIR, the first OL system for training ANNs in real time. TensAIR achieves remarkable performance and scalability by using a decentralized and asynchronous architecture to train ANN models (either freshly initialized or pre-trained) via DASGD (decentralized and asynchronous stochastic gradient descent). We empirically demonstrate that TensAIR achieves a nearly linear scale-out performance in terms of (1) the number of worker nodes deployed in the network, and (2) the throughput at which the data batches arrive at the dataflow operators. We depict the versatility of TensAIR by investigating both sparse (word embedding) and dense (image classification) use cases, for which TensAIR achieved from 6 to 116 times higher sustainable throughput rates than state-of-the-art systems for training ANN in a stream-processing pipeline.

Online Learning, Neural Networks, Asynchronous Stream Processing

copyright: acmcopyrightjournalyear: 2024copyright: rightsretainedconference: 2024 The 8th International Conference on Machine Learning and Soft Computing; January 26–28, 2024; Singapore, Singaporebooktitle: 2024 The 8th International Conference on Machine Learning and Soft Computing (ICMLSC 2024), January 26–28, 2024, Singapore, Singaporedoi: 10.1145/3647750.3647762isbn: 979-8-4007-1654-6/24/01

1. Introduction

Online learning (OL) is a branch of Machine Learning (ML) which studies solutions to time-sensitive problems that demand real-time answers based on fractions of data received in the form of data streams (Hoi etal., 2021). A common characteristic of data streams is the presence of concept drifts (Iwash*ta and Papa, 2018), i.e., changes in the statistical properties among the incoming data objects over time (Lu etal., 2018). Consequently, pre-trained ML models tend to be inadequate in OL scenarios as their performance usually decreases after concept drifts (Lu etal., 2018). Differently, OL models mitigate the negative effects of such concept drifts by being ready to instantly update themselves for any new data from the data stream (Hoi etal., 2021).

Due to the intrinsic time-sensitiveness of OL, it is not feasible to depend on solutions that spend an undue amount of time on retraining. Thus, if concept drifts are frequent, currently, complex OL problems cannot rely on robust solutions common to other ML problems due to their long training time (Hoi etal., 2021), like those involving Artificial Neural Networks (ANNs) (Goodfellow etal., 2016).

Therefore, how to solve complex OL problems like those involving data streams of audio, video, or even text remains an open research question, especially when they are affected by frequent concept drifts. Currently, most OL researchers are focused on how to improve the quality of the input data and how to adapt to concept drifts (Lu etal., 2018; Hu etal., 2020; Priya and Uthra, 2021; Barros and Santos, 2018; Sethi and Kantardzic, 2017) and they do not address the issue of solving such complex problems. Our intuition is that current hardware is already capable of training a large class of ANN models in real time if the training is distributed across multiple nodes efficiently. In this case, problems that today are deemed too complex to be effectively solved by standard OL learners could be solved using ANNs.

Nowadays, instead of retraining ANNs in real-time, state-of-the-art extensions (dl-, 2022; kaf, 2022) for Apache Flink (Carbone etal., 2015) and Kafka (Kreps etal., 2011) were developed to adapt/re-train their models using datasets created from buffered samples from the data-stream. Thus, these approaches enable the usage of ANN in OL by giving up the real-time adaptation of the ANN models, which can only be updated after buffering a dataset substantially large to be used for retraining. If adapted to be trained in real-time, those approaches suffer from performance and scalability issues (cf. Section 4). Thus, they cannot sustain throughput high enough for many real-world problems.

Consequently, when real-time adaptation is not available, one can expect a lower prediction/inference performance of models between the instant a concept drift occurs and the moment the model is updated. Thus, considering that non-trivial ANN models demand a high amount of training examples before convergence, one can expect low-quality predictions/inferences for an extended amount of time (until the training dataset is buffered and the model is retrained). This makes it unfeasible to apply this approach to real-world problems that suffer from frequent concept drifts.

To mitigate the prediction/inference performance decrease, we argue that it is necessary to adapt the ANN models in real-time. However, the real-time adaptation of ANN models on an OL scenario is not straightforward. We therefore highlight the following two challenges:

  • (1)

    real-time data management: Not all training data is available from the beginning. Thus, it is necessary to incrementally update the model with fractions of data at each step. Different from commonly used pre-defined training datasets.

  • (2)

    backpressure: The model must process a higher number of data samples per second (for training and for inference/ prediction) than the data stream produces. This avoids a sudden surge in latency or even a system crash.

In this paper, we present the architecture of TensAIR, the first OL framework for training ANN models (either freshly initialized or pre-trained) in real time. TensAIR leverages the fact that stochastic gradient descent (SGD) is an iterative method that can update a model based only on a fraction of the training data per iteration. Thus, instead of using pre-defined or buffered datasets for training, TensAIR models are updated after each data sample (or data batch) is made available by the data stream. In addition, TensAIR achieves remarkable scale-out performance by using a fully decentralized and asynchronous architecture throughout its whole dataflow, thus leveraging the usage of DASGD (decentralized and asynchronous SGD) to update the ANN models.

To assess TensAIR, we performed experiments on sparse (word embedding) and dense (image classification) models. In our experiments, TensAIR achieved nearly linear scale-out performance in terms of (1) the number of worker nodes deployed in the network, and (2) the throughput at which the data batches arrive at the dataflow operators. Moreover, we observed the same convergence rate in the distributed models independently of the number of worker nodes, which shows that the usage of DASGD did not negatively impact the models’ convergence. When compared to the state of the art, TensAIR’s sustainable throughput in the real-time OL setting was from 6 to 175 times higher than Apache Kafka extension (kaf, 2022) and from 6 to 120 times higher than Apache Flink extension (dl-, 2022). We additionally compared TensAIR to Horovod (Sergeev and DelBalso, 2018), distributed ANN framework developed by Uber, and achieved from 4 to 335 times higher sustainable throughput than them in the same real-time OL setting.

Below, we summarize the main contributions of this paper.

Contributions

  1. (1)

    Design and implementation of TensAIR, the first framework for real-time training and prediction in ANN models.

  2. (2)

    Creation and usage of our Decentralized and Asynchronous SGD (DASGD) algorithm.

  3. (3)

    Experimental evaluation of TensAIR showing almost linear training time speed-up in terms of nodes deployed.

  4. (4)

    Sustainable throughput comparison between TensAIR and state-of-the-art systems, with TensAIR achieving from 4 to 120 times higher sustainable throughput than the baselines;

  5. (5)

    Depiction of real-time Sentiment Analysis use case that would not be feasible with standard OL approaches.

2. Background

Considering that the real-time training of ANNs in an OL scenario involves multiple areas of research, we give in the following subsections a short summary of the most important concepts and techniques used in this paper.

2.1. Online Learning

Online learning (OL) has gained visibility due to the increase in the velocity and volume of available data sources compared to the past decade (Gomes etal., 2019). OL algorithms are trained using data streams as input, which differs from traditional ML algorithms that have a pre-defined training dataset.

Streams & Batches. Formally, a data stream 𝒮𝒮\mathcal{S}caligraphic_S consists of ordered events e𝑒eitalic_e with timestamps s𝑠sitalic_s, i.e., (e1,s1),,(e,s)subscript𝑒1subscript𝑠1subscript𝑒subscript𝑠(e_{1},s_{1}),\ldots,(e_{\infty},s_{\infty})( italic_e start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT , italic_s start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT ) , … , ( italic_e start_POSTSUBSCRIPT ∞ end_POSTSUBSCRIPT , italic_s start_POSTSUBSCRIPT ∞ end_POSTSUBSCRIPT ), where the sisubscript𝑠𝑖s_{i}italic_s start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT denote the processing time at which the corresponding events eisubscript𝑒𝑖e_{i}italic_e start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT are ingested into the system. These events are usually analysed in batches Bjsubscript𝐵𝑗B_{j}italic_B start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT of fixed size b𝑏bitalic_b, as follows:

B1subscript𝐵1\displaystyle B_{1}italic_B start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT=(e1,s1),,(eb,sb)absentsubscript𝑒1subscript𝑠1subscript𝑒𝑏subscript𝑠𝑏\displaystyle=(e_{1},s_{1}),\ldots,(e_{b},s_{b})= ( italic_e start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT , italic_s start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT ) , … , ( italic_e start_POSTSUBSCRIPT italic_b end_POSTSUBSCRIPT , italic_s start_POSTSUBSCRIPT italic_b end_POSTSUBSCRIPT )
B2subscript𝐵2\displaystyle B_{2}italic_B start_POSTSUBSCRIPT 2 end_POSTSUBSCRIPT=(eb+1,sb+1),,(e2b,s2b)absentsubscript𝑒𝑏1subscript𝑠𝑏1subscript𝑒2𝑏subscript𝑠2𝑏\displaystyle=(e_{b+1},s_{b+1}),\ldots,(e_{2b},s_{2b})= ( italic_e start_POSTSUBSCRIPT italic_b + 1 end_POSTSUBSCRIPT , italic_s start_POSTSUBSCRIPT italic_b + 1 end_POSTSUBSCRIPT ) , … , ( italic_e start_POSTSUBSCRIPT 2 italic_b end_POSTSUBSCRIPT , italic_s start_POSTSUBSCRIPT 2 italic_b end_POSTSUBSCRIPT )
\displaystyle\ldots

Batches Bjsubscript𝐵𝑗B_{j}italic_B start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT are analyzed individually. Thus, if processed in an asynchronous stream-processing scenario, the batches (and in particular the included events eisubscript𝑒𝑖e_{i}italic_e start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT) can become out-of-order as they are handled within the system, even if the initial sisubscript𝑠𝑖s_{i}italic_s start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT were ordered. In common stream-processing architectures, such as Apache Flink (Carbone etal., 2015), Spark (Zaharia etal., 2016) and Samza (Noghabi etal., 2017), batches are distinguished into sliding windows, tumbling windows and (per-user) sessions (Akidau etal., 2015).

Latency vs. Throughput. When analyzing systems that process data streams, one typically benchmarks them by their latency and throughput (Karimov etal., 2018). Formally, latency is the time it takes for a system to process an event, from the moment it is ingested to the moment it is used to produce a desired output. Throughput, on the other hand, is the number of events that a system can receive and process per time unit. The sustainable throughput is the maximum throughput at which a system can handle a stream over a sustained period of time (i.e., without exhibiting a sudden surge in latency, then called “backpressure” (Kulkarni etal., 2015), or even a crash).

Passive & Active Drift Adaptation. To adapt to concept drifts, one may rely on either passive or active adaptation strategies (Heusinger etal., 2020). The passive strategy updates the trained model indefinitely, with no regard to the actual presence of concept drifts. Active drift adaptation strategies, on the other hand, only adapt the model when a concept drift has been explicitly identified.

2.2. Artificial Neural Networks

ANNs denote a family of supervised ML algorithms which are designed to be trained on a pre-defined dataset (Goodfellow etal., 2016). A training dataset is composed of multiple (x,y)𝑥𝑦(x,y)( italic_x , italic_y ) pairs, in which x𝑥xitalic_x is a training example and y𝑦yitalic_y is its corresponding label. ANNs are usually trained using mini-batches X𝑋Xitalic_X, which are sets of (x,y)𝑥𝑦(x,y)( italic_x , italic_y ) pairs of fixed size N𝑁Nitalic_N that are iteratively (randomly) sampled from the training dataset, thus X=(xi,yi),,(xi+N,yi+N)𝑋subscript𝑥𝑖subscript𝑦𝑖subscript𝑥𝑖𝑁subscript𝑦𝑖𝑁X={(x_{i},y_{i}),\ldots,(x_{i+N},y_{i+N})}italic_X = ( italic_x start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT , italic_y start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT ) , … , ( italic_x start_POSTSUBSCRIPT italic_i + italic_N end_POSTSUBSCRIPT , italic_y start_POSTSUBSCRIPT italic_i + italic_N end_POSTSUBSCRIPT ).

An ANN model is represented by the weights and biases of the network, described together by θ𝜃\thetaitalic_θ and it is usually trained with variants of stochastic gradient descent (SGD) (Robbins and Monro, 1951). SGD updates θ𝜃\thetaitalic_θ by considering L(X,θ)𝐿𝑋𝜃\nabla L(X,\theta)∇ italic_L ( italic_X , italic_θ ), which is the gradient of a pre-defined loss function L𝐿Litalic_L with respect to θ𝜃\thetaitalic_θ when taking X𝑋Xitalic_X as input. Thus, we can represent the update rule of θ𝜃\thetaitalic_θ as in Equation 1, in which t𝑡titalic_t is the iteration in SGD, and α𝛼\alphaitalic_α is a pre-defined learning rate.

(1)θt+1=θtαL(X,θt)subscript𝜃𝑡1subscript𝜃𝑡𝛼𝐿𝑋subscript𝜃𝑡\theta_{t+1}=\theta_{t}-\alpha\nabla L(X,\theta_{t})italic_θ start_POSTSUBSCRIPT italic_t + 1 end_POSTSUBSCRIPT = italic_θ start_POSTSUBSCRIPT italic_t end_POSTSUBSCRIPT - italic_α ∇ italic_L ( italic_X , italic_θ start_POSTSUBSCRIPT italic_t end_POSTSUBSCRIPT )

Based on Equation 1, θt+1subscript𝜃𝑡1\theta_{t+1}italic_θ start_POSTSUBSCRIPT italic_t + 1 end_POSTSUBSCRIPT is defined based on two terms. The second term is the more computationally expensive one to calculate, which we refer to as gradient calculation (GC). The remainder of the equation we call gradient application (GA), which consists of the subtraction between the two terms and the assignment of the result to θt+1subscript𝜃𝑡1\theta_{t+1}italic_θ start_POSTSUBSCRIPT italic_t + 1 end_POSTSUBSCRIPT.

Distributed Artificial Neural Networks.Over the last years, ANN models have substantially grown in size and complexity. Consequently, the usage of traditional centralized architectures has become unfeasible when training complex models due to the high amount of time they spend until convergence (Sergeev and DelBalso, 2018). Researchers have been studying how to distribute ANN training to mitigate this.Distributed ANNs reduce the time it takes to train a complex ANN model by distributing its computation across multiple compute nodes. This distribution can follow different parallelization methods, system architectures, and synchronisation settings (Mayer and Jacobsen, 2020).

The most common form of distributing ANNs, which we also use in this work, is referred to as data parallelism (Ouyang etal., 2021), in which workers are initialized with replicas of the same initial model and trained with disjoint splits of the training data. Moreover, the synchronisation among the workers’ parameters in a data-parallel ANN setting is either centralised or decentralised (Mayer and Jacobsen, 2020). In a centralised architecture (Ouyang etal., 2021), workers systematically send their parameter updates to one or multiple parameter servers. Those servers aggregate the updates of all workers and apply them to a centralised model (Mayer and Jacobsen, 2020). Thus, by relying on parameter servers to aggregate updates, the parameter servers may become the bottleneck of such an architecture (Chen etal., 2019). On the other hand, in a decentralised architecture (Ouyang etal., 2021), the workers synchronize themselves using a broadcasting-like form of communication (Ouyang etal., 2021). This broadcast eliminates the bottleneck of the parameter servers but requires a direct communication among worker nodes.

The parameter updates in a data-parallel ANN system can be synchronous or asynchronous. In a synchronous setting (Ouyang etal., 2021), workers have to synchronize themselves after each mini-batch iteration. This synchronization barrier wastes computational resources at idle times (i.e., when workers have to wait for others to resume their computation) (Mayer and Jacobsen, 2020). In an asynchronous SGD (ASGD) setting (Ouyang etal., 2021), workers are allowed to compute their gradient computations also on stale model parameters. This behaviour obviously minimizes idle times but makes it harder to mathematically prove SGD convergence. Recent developments on ASGD (Jiang etal., 2017; Sun etal., 2017; Zhang etal., 2020), however, have tackled exactly this issue under different assumptions. Zhang et al. (Zhang etal., 2020) recently proved an 𝐨(1/k)𝐨1𝑘\mathbf{o}(1/\sqrt{k})bold_o ( 1 / square-root start_ARG italic_k end_ARG ) convergence rate for unbounded non-convex problems using ASGD under a centralised parameter server setup (where k𝑘kitalic_k denotes the iteration among the ASGD updates). Additionally, (Lian etal., 2018; Bornstein etal., 2023; Tosi and Theobald, 2023) proved the convergence of ASGD on decentralized networks under distinct assumptions and network topologies.

3. TensAIR

We now introduce the architecture of TensAIR, the first framework for training and predicting in ANNs models in real-time. TensAIR was designed to work in association with stream-processing engines that allow asynchronous and decentralizedcommunication among dataflow operators.

TensAIR introduces the data-parallel, decentralized, asynchronous ANN operator Model, with train and predict as two new OL functions. This means that TensAIR can scale out both the training and prediction tasks of an ANN model to multiple compute nodes, either with or without GPUs associated with them. TensAIR dataflow can be visualized using a graph (see Figure1). Note that, throughout this paper, we use the terms prediction and inference interchangeably.

TensAIR: Real-Time Training of Neural Networks from Data-streams (1)

TensAIR Dataflows. Figure1 depicts a generic TensAIR dataflow. This dataflow is composed of a single input data stream, n instances of the 𝙼𝚘𝚍𝚎𝚕𝙼𝚘𝚍𝚎𝚕{\tt Model}typewriter_Model operator, and single instances of the Split and UDF operators. The idea behind a TensAIR dataflow is: (1) to receive training samples from the input data streams; (2) to pre-process the data received using common dataflow operators like Map, Reduce, Split, and Join to transform the data as deemed necessary given each use case; (3) to select whether the pre-processed data samples will be used for training, for prediction, or for both; (4) if data is sent for training, to aggregate a pre-defined number of samples in the form of a mini-batch by using a user-defined function UDF, and to send this mini-batch to one of the decentralized Model instances; (4a) when a Model instance receives a mini-batch X𝑋Xitalic_X from the UDF, to calculate an update x𝑥\nabla x∇ italic_x based on the current Model weights and the mini-batch X𝑋Xitalic_X, to apply the update to itself, and to broadcast the update to other Model instances; (4b) when Modeli𝑀𝑜𝑑𝑒subscript𝑙𝑖Model_{i}italic_M italic_o italic_d italic_e italic_l start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT receives an update from Modelj𝑀𝑜𝑑𝑒subscript𝑙𝑗Model_{j}italic_M italic_o italic_d italic_e italic_l start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT, to apply the received update locally; (5) if the pre-processed data is sent for prediction, to randomly select one of the distributed models and use it to perform the prediction; (6) to use the prediction previously made as final output of the dataflow or as input of further operators (as deemed necessary by the given use case).

Stream Processing.As shown in Algorithm 1, a TensAIR Model operator has two new OL functions train and predict, which can asynchronously send and receive messages to and from other operators. During train, Model receives either encoded mini-batches X𝑋Xitalic_X or gradients x𝑥\nabla x∇ italic_x as messages. Each message encoding a gradient that was computed by another model instance is immediately used to update the local model accordingly. Each mini-batch first invokes a local gradient computation and is then used to update the local model. Each such resulting gradient is also locally summed until a desired number of gradients (𝑚𝑎𝑥𝐺𝑟𝑎𝑑𝐵𝑢𝑓𝑓𝑒𝑟𝑚𝑎𝑥𝐺𝑟𝑎𝑑𝐵𝑢𝑓𝑓𝑒𝑟\mathit{maxGradBuffer}italic_maxGradBuffer) is reached, upon which the buffer then is broadcast to all other Model instances.

1:Constructor Model (𝑡𝑓𝑀𝑜𝑑𝑒𝑙𝑡𝑓𝑀𝑜𝑑𝑒𝑙\mathit{tfModel}italic_tfModel, 𝑚𝑎𝑥𝐵𝑢𝑓𝑓𝑒𝑟𝑚𝑎𝑥𝐵𝑢𝑓𝑓𝑒𝑟\mathit{maxBuffer}italic_maxBuffer):

2:model = 𝑡𝑓𝑀𝑜𝑑𝑒𝑙𝑡𝑓𝑀𝑜𝑑𝑒𝑙\mathit{tfModel}italic_tfModel

3:maxGradBuffer = 𝑚𝑎𝑥𝐵𝑢𝑓𝑓𝑒𝑟𝑚𝑎𝑥𝐵𝑢𝑓𝑓𝑒𝑟\mathit{maxBuffer}italic_maxBuffer

4:gradients = \emptyset

5:gradients_count = 0

6:procedureprocess_msg(𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

7:if𝑚𝑠𝑔.𝑚𝑜𝑑𝑒formulae-sequence𝑚𝑠𝑔𝑚𝑜𝑑𝑒\mathit{msg}.\mathit{mode}italic_msg . italic_mode == TRAINthen

8:train(𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

9:else

10:predict(𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

11:proceduretrain(𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

12:if𝑚𝑠𝑔.𝑖𝑠𝐺𝑟𝑎𝑑𝑖𝑒𝑛𝑡formulae-sequence𝑚𝑠𝑔𝑖𝑠𝐺𝑟𝑎𝑑𝑖𝑒𝑛𝑡\mathit{msg}.\mathit{isGradient}italic_msg . italic_isGradientthen

13:model = apply_gradient(model, 𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

14:else

15:gradient = calculate_gradient(model, 𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

16:model = apply_gradient(model, gradient)

17:gradients += gradient

18:gradients_count += 1

19:ifgradients_count \geq maxGradBufferthen

20:send_gradients(gradients)

21:gradients_count = 0

22:procedurepredict(𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

23:predictions = model.make_predictions(𝑚𝑠𝑔𝑚𝑠𝑔\mathit{msg}italic_msg)

24:send_results(predictions)

3.1. Model Consistency

Despite TensAIR’s asynchronous nature, it is necessary to maintain the models consistent among themselves during training in order to guarantee that they are aligned and, therefore, they eventually convergence to a same common model. In TensAIR, this is given by the exchange of gradients between the various Model instances.

Due to our asynchronous computation and application of the gradients on the distributed model instances, Modeli𝑖{}_{i}start_FLOATSUBSCRIPT italic_i end_FLOATSUBSCRIPT receives gradients calculated by Modelj𝑗{}_{j}start_FLOATSUBSCRIPT italic_j end_FLOATSUBSCRIPT (with ji𝑗𝑖j\neq iitalic_j ≠ italic_i) which are similar but not necessarily equal to itself. This occurs whenever Modeli𝑖{}_{i}start_FLOATSUBSCRIPT italic_i end_FLOATSUBSCRIPT, which has already applied to itself a set of Gi={x,y,,z}subscript𝐺𝑖𝑥𝑦𝑧G_{i}=\{\nabla x,\nabla y,...,\nabla z\}italic_G start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT = { ∇ italic_x , ∇ italic_y , … , ∇ italic_z } gradients, calculates a new gradient a𝑎\nabla a∇ italic_a, and sends it to Modelj𝑗{}_{j}start_FLOATSUBSCRIPT italic_j end_FLOATSUBSCRIPT, such that GiGjsubscript𝐺𝑖subscript𝐺𝑗G_{i}\neq G_{j}italic_G start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT ≠ italic_G start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT at the time when Modelj𝑗{}_{j}start_FLOATSUBSCRIPT italic_j end_FLOATSUBSCRIPT applies a𝑎\nabla a∇ italic_a. The difference |GiGj||GiGj|subscript𝐺𝑖subscript𝐺𝑗subscript𝐺𝑖subscript𝐺𝑗|G_{i}\cup G_{j}|-|G_{i}\cap G_{j}|| italic_G start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT ∪ italic_G start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT | - | italic_G start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT ∩ italic_G start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT | between these two models is defined as staleness (Tosi etal., 2022). This 𝑠𝑡𝑎𝑙𝑒𝑛𝑒𝑠𝑠i,j(a)subscript𝑠𝑡𝑎𝑙𝑒𝑛𝑒𝑠𝑠𝑖𝑗subscript𝑎\mathit{staleness}_{i,j}(\nabla_{a})italic_staleness start_POSTSUBSCRIPT italic_i , italic_j end_POSTSUBSCRIPT ( ∇ start_POSTSUBSCRIPT italic_a end_POSTSUBSCRIPT ) metric is the symmetric distance between Gisubscript𝐺𝑖G_{i}italic_G start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT and Gjsubscript𝐺𝑗G_{j}italic_G start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT with respect to the times at which a new gradient asubscript𝑎\nabla_{a}∇ start_POSTSUBSCRIPT italic_a end_POSTSUBSCRIPT was computed by a model i𝑖iitalic_i and is applied to model j𝑗jitalic_j, respectively.We illustrate this phenomenon and the staleness metric in Figure 2.

TensAIR: Real-Time Training of Neural Networks from Data-streams (2)

Figure 2 illustrates the timeline of messages (containing both mini-batches and gradients) exchanged among TensAIR models considering 𝑚𝑎𝑥𝐺𝑟𝑎𝑑𝐵𝑢𝑓𝑓𝑒𝑟=1𝑚𝑎𝑥𝐺𝑟𝑎𝑑𝐵𝑢𝑓𝑓𝑒𝑟1\mathit{max}\-\mathit{GradBuffer}=1italic_maxGradBuffer = 1. Assume the UDF distributes 5 mini-batches to 3 models. After receiving their first mini-batch, each Modeli𝑖{}_{i}start_FLOATSUBSCRIPT italic_i end_FLOATSUBSCRIPT calculates a corresponding gradient. Note that, when applied locally, the staleness of any gradient is 0 because it is computed and immediately applied by the same model. While computing or applying a local gradient, each Modeli𝑖{}_{i}start_FLOATSUBSCRIPT italic_i end_FLOATSUBSCRIPT may receive more gradients to calculate and/or apply from either the UDF or other models asynchronously. In our protocol, the models first finish their current gradient computation, apply it locally, then buffer and send 𝑚𝑎𝑥𝐺𝑟𝑎𝑑𝐵𝑢𝑓𝑓𝑒𝑟𝑚𝑎𝑥𝐺𝑟𝑎𝑑𝐵𝑢𝑓𝑓𝑒𝑟\mathit{maxGradBuffer}italic_maxGradBuffer many locally computed gradients to the other models, and wait for their next update.

As an illustration, take a look at Model22{}_{2}start_FLOATSUBSCRIPT 2 end_FLOATSUBSCRIPT in Figure 2. While computing bluesubscript𝑏𝑙𝑢𝑒\nabla_{blue}∇ start_POSTSUBSCRIPT italic_b italic_l italic_u italic_e end_POSTSUBSCRIPT, it receives the yellow mini-batch from the Mini Batch generator, which it starts computing immediately after it finishes processing the blue one—which it had already started when it received the yellow mini-batch. During the computation of yellowsubscript𝑦𝑒𝑙𝑙𝑜𝑤\nabla_{yellow}∇ start_POSTSUBSCRIPT italic_y italic_e italic_l italic_l italic_o italic_w end_POSTSUBSCRIPT, Model22{}_{2}start_FLOATSUBSCRIPT 2 end_FLOATSUBSCRIPT receives greensubscript𝑔𝑟𝑒𝑒𝑛\nabla_{green}∇ start_POSTSUBSCRIPT italic_g italic_r italic_e italic_e italic_n end_POSTSUBSCRIPT to apply, which it does promptly after finishing yellowsubscript𝑦𝑒𝑙𝑙𝑜𝑤\nabla_{yellow}∇ start_POSTSUBSCRIPT italic_y italic_e italic_l italic_l italic_o italic_w end_POSTSUBSCRIPT. Note that when Model33{}_{3}start_FLOATSUBSCRIPT 3 end_FLOATSUBSCRIPT computed greensubscript𝑔𝑟𝑒𝑒𝑛\nabla_{green}∇ start_POSTSUBSCRIPT italic_g italic_r italic_e italic_e italic_n end_POSTSUBSCRIPT and Model11{}_{1}start_FLOATSUBSCRIPT 1 end_FLOATSUBSCRIPT computed redsubscript𝑟𝑒𝑑\nabla_{red}∇ start_POSTSUBSCRIPT italic_r italic_e italic_d end_POSTSUBSCRIPT, they have not applied a single gradient to their local models at that time. Thus, |G1|=|G3|=0subscript𝐺1subscript𝐺30|G_{1}|=|G_{3}|=0| italic_G start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT | = | italic_G start_POSTSUBSCRIPT 3 end_POSTSUBSCRIPT | = 0. However, before applying greensubscript𝑔𝑟𝑒𝑒𝑛\nabla_{green}∇ start_POSTSUBSCRIPT italic_g italic_r italic_e italic_e italic_n end_POSTSUBSCRIPT, G2={blue,yellow}subscript𝐺2subscript𝑏𝑙𝑢𝑒subscript𝑦𝑒𝑙𝑙𝑜𝑤G_{2}=\{\nabla_{blue},\nabla_{yellow}\}italic_G start_POSTSUBSCRIPT 2 end_POSTSUBSCRIPT = { ∇ start_POSTSUBSCRIPT italic_b italic_l italic_u italic_e end_POSTSUBSCRIPT , ∇ start_POSTSUBSCRIPT italic_y italic_e italic_l italic_l italic_o italic_w end_POSTSUBSCRIPT } with |G2|=2subscript𝐺22|G_{2}|=2| italic_G start_POSTSUBSCRIPT 2 end_POSTSUBSCRIPT | = 2 and 𝑠𝑡𝑎𝑙𝑒𝑛𝑒𝑠𝑠3,2(green)=2subscript𝑠𝑡𝑎𝑙𝑒𝑛𝑒𝑠𝑠32subscript𝑔𝑟𝑒𝑒𝑛2\mathit{staleness}_{3,2}(\nabla_{green})=2italic_staleness start_POSTSUBSCRIPT 3 , 2 end_POSTSUBSCRIPT ( ∇ start_POSTSUBSCRIPT italic_g italic_r italic_e italic_e italic_n end_POSTSUBSCRIPT ) = 2. Along the same lines, before applying redsubscript𝑟𝑒𝑑\nabla_{red}∇ start_POSTSUBSCRIPT italic_r italic_e italic_d end_POSTSUBSCRIPT, |G2|=3subscript𝐺23|G_{2}|=3| italic_G start_POSTSUBSCRIPT 2 end_POSTSUBSCRIPT | = 3 and 𝑠𝑡𝑎𝑙𝑒𝑛𝑒𝑠𝑠1,2(red)=3subscript𝑠𝑡𝑎𝑙𝑒𝑛𝑒𝑠𝑠12subscript𝑟𝑒𝑑3\mathit{staleness}_{1,2}(\nabla_{red})=3italic_staleness start_POSTSUBSCRIPT 1 , 2 end_POSTSUBSCRIPT ( ∇ start_POSTSUBSCRIPT italic_r italic_e italic_d end_POSTSUBSCRIPT ) = 3.

3.2. Model Convergence

Since TensAIR operates on data streams and is both asynchronous and fully decentralized (i.e., it has no centralized parameter server), it exhibits characteristics that most SGD proofs of convergence (Zhang etal., 2020; Sun etal., 2017; Jiang etal., 2017) do not cover. Therefore, we next discuss under which circ*mstances TensAIR is guaranteed to converge.

First, we consider that training is performed between significant concept drifts. Therefore, we assume that the data distribution between two subsequent concept drifts does not change. Thus, if a concept drift occurs during the training, the model will not converge until the concept drift ends. By considering this, the data stream between two concept drifts will behave like a fixed data set. In this case, if given enough training examples, as seen in (Goodfellow etal., 2016), each of the local model instances will eventually converge.

Second, considering TensAIR’s decentralized and asynchronous SGD (DASGD), model updates can be staled. Nevertheless, as proven by Tosi and Theobald (Tosi and Theobald, 2023), the model will converge in this setting in up to 𝒪(σϵ2)+𝒪(QSavgϵ32)+𝒪(Savgϵ)𝒪𝜎superscriptitalic-ϵ2𝒪𝑄subscript𝑆𝑎𝑣𝑔superscriptitalic-ϵ32𝒪subscript𝑆𝑎𝑣𝑔italic-ϵ\mathcal{O}(\frac{\sigma}{\epsilon^{2}})+\mathcal{O}(\frac{QS_{avg}}{\epsilon^%{\frac{3}{2}}})+\mathcal{O}(\frac{S_{avg}}{\epsilon})caligraphic_O ( divide start_ARG italic_σ end_ARG start_ARG italic_ϵ start_POSTSUPERSCRIPT 2 end_POSTSUPERSCRIPT end_ARG ) + caligraphic_O ( divide start_ARG italic_Q italic_S start_POSTSUBSCRIPT italic_a italic_v italic_g end_POSTSUBSCRIPT end_ARG start_ARG italic_ϵ start_POSTSUPERSCRIPT divide start_ARG 3 end_ARG start_ARG 2 end_ARG end_POSTSUPERSCRIPT end_ARG ) + caligraphic_O ( divide start_ARG italic_S start_POSTSUBSCRIPT italic_a italic_v italic_g end_POSTSUBSCRIPT end_ARG start_ARG italic_ϵ end_ARG ) iterations to an ϵitalic-ϵ\epsilonitalic_ϵ-small error, considering Savgsubscript𝑆𝑎𝑣𝑔S_{avg}italic_S start_POSTSUBSCRIPT italic_a italic_v italic_g end_POSTSUBSCRIPT as the average staleness observed during training and Q𝑄Qitalic_Q a constant that bounds the gradients size. If bounded gradients are not assumed, DASGD converges in 𝒪(σϵ2)+𝒪(S^avgS^maxϵ)𝒪𝜎superscriptitalic-ϵ2𝒪subscript^𝑆𝑎𝑣𝑔subscript^𝑆𝑚𝑎𝑥italic-ϵ\mathcal{O}(\frac{\sigma}{\epsilon^{2}})+\mathcal{O}(\frac{\sqrt{\hat{S}_{avg}%\hat{S}_{max}}}{\epsilon})caligraphic_O ( divide start_ARG italic_σ end_ARG start_ARG italic_ϵ start_POSTSUPERSCRIPT 2 end_POSTSUPERSCRIPT end_ARG ) + caligraphic_O ( divide start_ARG square-root start_ARG over^ start_ARG italic_S end_ARG start_POSTSUBSCRIPT italic_a italic_v italic_g end_POSTSUBSCRIPT over^ start_ARG italic_S end_ARG start_POSTSUBSCRIPT italic_m italic_a italic_x end_POSTSUBSCRIPT end_ARG end_ARG start_ARG italic_ϵ end_ARG ) iterations, with S^maxsubscript^𝑆𝑚𝑎𝑥\hat{S}_{max}over^ start_ARG italic_S end_ARG start_POSTSUBSCRIPT italic_m italic_a italic_x end_POSTSUBSCRIPT and S^avgsubscript^𝑆𝑎𝑣𝑔\hat{S}_{avg}over^ start_ARG italic_S end_ARG start_POSTSUBSCRIPT italic_a italic_v italic_g end_POSTSUBSCRIPT representing the maximum and average staleness, calculated using an additional recursive factor.

3.3. Implementation

TensAIR was implemented on top of the Asynchronous Iterative Routing (AIR) (Venugopal etal., 2020, 2022) dataflow engine. AIR is a native stream-processing engine that processes complex dataflows in an asynchronous and decentralized manner. TensAIR dataflow operators extend a basic Vertex superclass in AIR. Vertex implements AIR’s asynchronous MPI protocol via multi-threaded queues of incoming and outgoing messages, which are exchanged among all nodes (aka. “ranks”) in the network asynchronously. This is crucial to guarantee that worker nodes do not stay idle while waiting to send or receive messages during training. The number of instances of each Vertex subclass and the number of input data streams can be configured beforehand, as seen in Figure 1.

TensAIR is completely implemented in C++. It includes the TensorFlow 2.8 native C API to load, save, train, and predict ANN models. Therefore, it is possible to develop a TensorFlow/Keras model in Python, save the model to a file, and load it directly into TensAIR. TensAIR is completely open-source and available from our GitHub repository111https://github.com/maurodlt/TensAIR.

4. Experiments & Discussion

To assess TensAIR, we performed experiments to measure its performance on solving prototypical ML problems such as Word2Vec (word embeddings) and CIFAR-10 (image classification). We empirically validate TensAIR’s model convergence by comparing its training loss curve at increasing levels of distribution across both CPUs and GPUs. Our results confirm that TensAIR’s DASGD updates achieve similar convergence on Word2Vec and CIFAR-10 as a synchronous SGD propagation. At the same time, we achieve a nearly linear reduction in training time on both problems. Due to this reduction, TensAIR significantly outperforms not just the current OL extensions of Apache Kafka and Flink (based on both the standard and distributed TensorFlow APIs), but also Horovod which is a long-standing effort to scale-out ANN training. Finally, by providing an in-depth analysis of a sentiment analysis (SA) use-case on Twitter, we demonstrate the importance of OL in the presence of concept drifts (i.e., COVID-19 related tweets with changing sentiments). In particular the SA usecase is an example of task that would be deemed too complex to be adapted in real-time (at a throughput rate of up to 6,000 tweets per second) when using other OL frameworks.

HPC Setup. We carried out the experiments described in this section using the HPC facilities of the University of Luxembourg (Varrette etal., 2022). We distributed the ANNs training using up to 4 Nvidia Tesla V100 GPUs in a node with 768 GB RAM. We also deployed up to 16 regular nodes, with 28 CPU cores and 128 GB RAM each, for the CPU-based (i.e., without using GPU acceleration) settings.

Event Generation. We trained both sparse (word embeddings222https://www.tensorflow.org/tutorials/text/word2vec) and dense (image classification333https://www.tensorflow.org/tutorials/images/cnn) models based on English Wikipedia articles and images from CIFAR-10 (Krizhevsky etal., 2009), respectively. Instead of connecting to actual streams, we chose those static datasets to facilitate a consistent analysis of the results and ensure reproducibility. Moreover, to simulate a streaming scenario, we implemented the MiniBatchGenerator as an entry-point Vertex operator (compare to Figure 1) which generates events eisubscript𝑒𝑖e_{i}italic_e start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT with timestamps sisubscript𝑠𝑖s_{i}italic_s start_POSTSUBSCRIPT italic_i end_POSTSUBSCRIPT, groups them into mini-batches Xjsubscript𝑋𝑗X_{j}italic_X start_POSTSUBSCRIPT italic_j end_POSTSUBSCRIPT by using a tumbling-window semantics, and sends these mini-batches to the subsequent operators in the dataflow. Furthermore, this allows us to simulate streams of unbounded size by iterating over the datasets multiple times (in analogy to training with multiple epochs over a fixed dataset).

Sparse vs. Dense Models. We chose Word2Vec and CIFAR-10 because they represent prototypical ML problems with sparse and dense model updates, respectively. Sparse updates mean that only a small portion of the neural network variables actually become updated per mini-batch (Recht etal., 2011). Hence, sparseness should assist the models’ convergence when using DASGD, as observed also in Hogwild! (Recht etal., 2011). We trained by sampling 1% from English Wikipedia which corresponds to 11.7M training examples (i.e., word pairs). On the other hand, we chose CIFAR-10 for being dense. Thus, we could analyze how this characteristic possibly hinders convergence when models are distributed and updated asynchronously. We train on all of the 50,000 labeled images of the CIFAR-10 dataset.

4.1. Convergence Analysis

We first explored TensAIR’s ability to converge by determining if and how DASGD might degrade the quality of the trained model (Figure 3). We compared the training loss curve of Word2Vec and CIFAR-10 by distributing TensAIR models from 1 to 4 GPUs using 1 TensAIR rank per GPU (Figures 2(b) & 2(d)). We additionally explored the models convergence when trained with distributed CPU nodes (Figures 2(a) & 2(c)). In this second scenario, we trained up to 64 ranks on 16 nodes simultaneously without GPUs. Note that, when using a single TensAIR rank, TensAIR’s gradient updates behave as in a synchronous SGD implementation.

TensAIR: Real-Time Training of Neural Networks from Data-streams (3)
TensAIR: Real-Time Training of Neural Networks from Data-streams (4)
TensAIR: Real-Time Training of Neural Networks from Data-streams (5)
TensAIR: Real-Time Training of Neural Networks from Data-streams (6)

The extremely low variance among all loss curves shown in Figures 2(a) and 2(b) demonstrates that our asynchronous and distributed SGD updates do not at all negatively affect the convergence of the Word2Vec models. We assume that this is due to (1) the sparseness of Word2Vec, and (2) a low staleness of the gradients (which are relatively inexpensive to compute and apply for Word2Vec). The low staleness indicates a fast exchange of gradients among models.

In Figure 2(c), we however observe a remarkable degradation of the loss when distributing CIFAR-10 across multiple nodes. This is due to the fixed learning rate used on all settings being the same. When distributing dense models on multiple ranks without adapting the mini-batch size, it is well known to result in a degradation of the loss curve (even on synchronous settings). This degradation occurs because the behaviour of training N𝑁Nitalic_N models with mini-batches of size b𝑏bitalic_b is similar to training 1 model with mini-batches of size Nb𝑁𝑏N\cdot bitalic_N ⋅ italic_b. To mitigate this issue, Horovod increases the learning rate α𝛼\alphaitalic_α by the number of ranks used to distribute the model (hor, 2019), i.e., αnew=αNsubscript𝛼𝑛𝑒𝑤𝛼𝑁\alpha_{new}=\alpha\cdot Nitalic_α start_POSTSUBSCRIPT italic_n italic_e italic_w end_POSTSUBSCRIPT = italic_α ⋅ italic_N. Accordingly, in Figure 2(d), we again do not see any degradation of the loss when distributing CIFAR-10 because we use a maximum of 4 GPUs.

4.2. Speed-up Analysis

Next, we explore the performance of TensAIR under increasing levels of distribution and with respect to varying mini-batch sizes over both Word2Vec and CIFAR-10. This experiment is also deployed on up to 64 ranks (16 nodes) and up to 4 GPUs (1 node). We observe in Figure 4 that TensAIR achieves a nearly-linear scale-out under most of our settings.

In most cases, TensAIR achieves a better speedup when training with smaller mini-batches. This difference is because, differently than the gradient calculation, the gradient application is not distributed and, with smaller mini-batches, more gradients are applied per epoch. Thus, models with expensive gradient computations will have a better scale-out performance. Nevertheless, when gradient calculation is not the bottleneck of the dataflow, one can reduce the computational impact of the gradients application and the network impact of their broadcasts by simply increasing maxBuffer. For instance, by increasing maxBuffer in n𝑛nitalic_n times, the network complexity and the computational impact of the gradients applications are also expected to be reduce in n𝑛nitalic_n times.

TensAIR: Real-Time Training of Neural Networks from Data-streams (7)
TensAIR: Real-Time Training of Neural Networks from Data-streams (8)
TensAIR: Real-Time Training of Neural Networks from Data-streams (9)
TensAIR: Real-Time Training of Neural Networks from Data-streams (10)

4.3. Baseline Comparison

Apart from TensAIR, it is also possible to train ANNs by using Apache Kafka and Flink as message brokers to generate data streams of varying throughputs. Kafka is already included in the standard TensorFlow I/O library (tensorflow_io), which however allows no actual distribution in the training phase (kaf, 2022). Flink, on the other hand, employs the distributed TensorFlow API (tensorflow.distribute). However, we were not able to run the provided dl-on-flink use-case (dl-, 2022) even after various attempts on our HPC setup. We therefore report the direct deployment of our Word2Vec and CIFAR-10 use-cases (Figures 4(a) & 4(b)) on both the standard and distributed TensorFlow APIs (the latter using the MirroredStrategy option of tensorflow.distribute). We thereby, simulate a streaming scenario by feeding one mini-batch per training iteration into TensorFlow, which yields a very optimistic upper-bound for the maximum throughput that Kafka and Flink could achieve. In a similar manner, we also determined the maximum throughput of Horovod (Sergeev and DelBalso, 2018), which is however not a streaming engine by default.

TensAIR: Real-Time Training of Neural Networks from Data-streams (11)
TensAIR: Real-Time Training of Neural Networks from Data-streams (12)

In Figures 4(a) and 4(b), we see that TensAIR clearly surpasses both the standard and distributed TensorFlow setups as well as Horovod. This occurs because, as opposed to TensAIR, their architectures were not developed to train on batches arriving from data streams. Thus, in a streaming scenario, the overhead of transferring the training data to the worker nodes increases by the number of training steps. On the other hand, TensAIR was designed to train ANN models from high throughput data streams in real-time. Thus, the transfer of training data overhead is mitigated by the asynchronous protocol adopted and the training is speed-up by DASGD. This allows TensAIR to (1) reduce both computational resources and idle times while the data is being transferred, and (2) have an optimized buffer management for incoming mini-batches and outgoing gradients, respectively.

In our experiments, we could sustain a maximum training rate of 285,560 training examples per second on Word2Vec and 200,000 images per second on CIFAR-10, which corresponds to sustainable throughputs of 14.16 MB/s and 585 MB/s respectively. We reached these values by training with 3 GPUs on Word2Vec and 4 GPUs on CIFAR-10. Note that, while using more than 3 GPUs simultaneously, TensAIR did not achieve better sustainable throughput in the W2V usecase due to the relatively low complexity of the gradient calculations. In this scenario, the training bottleneck, typically associated with gradient calculation, shifted to the gradient application when using more than 3 GPUs, as the former is not distributed. Nevertheless, this issue can be mitigated by simply increasing the variable maxBuffer𝑚𝑎𝑥𝐵𝑢𝑓𝑓𝑒𝑟maxBufferitalic_m italic_a italic_x italic_B italic_u italic_f italic_f italic_e italic_r (as explained in Section 4.2). This adjustment, delays the communication among distributed models while reducing the locally applied gradients by a factor of maxBuffer𝑚𝑎𝑥𝐵𝑢𝑓𝑓𝑒𝑟maxBufferitalic_m italic_a italic_x italic_B italic_u italic_f italic_f italic_e italic_r.

4.4. Sentiment Analysis of COVID19

Here, we exemplify the benefits of training an ANN in real-time from streaming data. To this end, we analyze the impact of concept drifts on a sentiment analysis setting, specifically drifts that occurred during and due to the COVID19 pandemic. First, we trained a large Word2Vec model using 20% of English Wikipedia plus the Sentiment140 dataset (Go etal., 2009). Then, we trained an LSTM model (ten, 2022) using the Sentiment140 dataset together with the word embeddings we trained previously. After three epochs, we reached 78% accuracy on the training and the test set. However, language is always evolving. Thus, this model may not sustain its accuracy for long if deployed to analyze streaming data in real-time. We exemplify this by fine-tuning the word embeddings with 2M additional tweets published from November 1st, 2019 to October 10th, 2021 containing the following keywords: covid19, corona, coronavirus, pandemic, quarantine, lockdown, sarscov2. Then, we compared the previously trained word embeddings and the fine-tuned ones and found an average cosine difference of only 2%. However, despite being small, this difference is concentrated onto specific keywords.

Termrtcoronapandemicbooster2021
Difference0.7280.6580.6460.6250.620

As shown in Table 1, keywords related to the COVID-19 pandemic are the ones that most suffered from a concept drift. Take as example pandemic, booster and corona, which had over 62% of cosine difference before and after the Word2Vec models have been updated.Due to the concept drift, the sentiment over specific terms and, consequently, entire tweets also changed. One observes this change by comparing the output of our LSTM model when: (1) inputting tweets embedded with the pre-trained word embeddings; (2) inputting tweets embedded with the fine-tuned word embeddings. Take as an example the sentence “I got corona.”, which had a sentiment of +2.04632.0463+2.0463+ 2.0463 when predicted with the pre-trained embeddings; and 2.48732.4873-2.4873- 2.4873 when predicted with the fine-tuned embeddings. Considering that the higher the sentiment value the more positive the tweet is, we can observe that corona (also representing a brand of a beer) was seen as positive and now is related to a very negative sentiment.

To tackle concept drifts in this use-case, we argue that TensAIR with its OL components (as depicted in Figure 6) could be readily deployed. A real-time pipeline with Twitter would allow us to constantly update the word embeddings (our sustainable throughput would be more than sufficient compared to the estimated throughput of Twitter). Consequently, the sentiment analysis algorithm would always be up-to-date with respect to such concept drifts.

TensAIR: Real-Time Training of Neural Networks from Data-streams (13)

Figure6 depicts the dataflow for a Sentiment Analysis (SA) use-case on a Twitter data stream. This dataflow predicts the sentiments of live tweets using a pre-trained ANN model (ModelSA𝑆𝐴{}^{SA}start_FLOATSUPERSCRIPT italic_S italic_A end_FLOATSUPERSCRIPT). However, it does not rely on pre-defined word embeddings. The dataflow constantly improves its embeddings on a second Word2Vec (W2V) ANN model (ModelW2V𝑊2𝑉{}^{W2V}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT), which it trains using the same input stream as used for the predictions. By following a passive concept-drift adaptation strategy, it can adapt its sentiment predictions in real-time based on changing word distributions among the input tweets. Moreover, it does not require any sentiment labels for newly streamed tweets at ModelSA𝑆𝐴{}^{SA}start_FLOATSUPERSCRIPT italic_S italic_A end_FLOATSUPERSCRIPT, since only ModelW2V𝑊2𝑉{}^{W2V}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT is re-trained in a self-supervised manner by generating mini-batches of word pairs (x,y)𝑥𝑦(x,y)( italic_x , italic_y ) directly from the input tweets.

Our SA dataflow starts with Map which receives tweets from a Twitter input stream (implemented via cURL or a file interface) and tokenizes the tweets based on the same word dictionary also used by ModelW2V𝑊2𝑉{}^{W2V}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT and ModelSA𝑆𝐴{}^{SA}start_FLOATSUPERSCRIPT italic_S italic_A end_FLOATSUPERSCRIPT. Split then identifies whether the tokenized tweets shall be used for re-training the word embeddings, for sentiment prediction, or for both. If the tokenized tweets are selected for training, they are turned into mini-batches via the UDF operator. The (x,y)𝑥𝑦(x,y)( italic_x , italic_y ) word pairs in each mini-batch X𝑋Xitalic_X are sharded across Model1W2Vsubscriptsuperscriptabsent𝑊2𝑉1{}^{W2V}_{1}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT start_POSTSUBSCRIPT 1 end_POSTSUBSCRIPT, \ldots, ModelnW2Vsubscriptsuperscriptabsent𝑊2𝑉𝑛{}^{W2V}_{n}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT start_POSTSUBSCRIPT italic_n end_POSTSUBSCRIPT with a standard hash-partitioner using words x𝑥xitalic_x as keys. ModelW2V𝑊2𝑉{}^{W2V}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT implements a default skip-gram model. If the tokenized tweets are selected for prediction, a tweet is vectorized by using the word embeddings obtained from any of the ModelW2V𝑊2𝑉{}^{W2V}start_FLOATSUPERSCRIPT italic_W 2 italic_V end_FLOATSUPERSCRIPT instances and sent to the pre-trained ModelSA𝑆𝐴{}^{SA}start_FLOATSUPERSCRIPT italic_S italic_A end_FLOATSUPERSCRIPT which then predicts the tweets’ sentiments.

5. Conclusions

OL is an emerging area of research which still has not extensively explored the real-time training of ANNs. In this paper, we introduced TensAIR, a novel system for real-time training of ANNs from data streams. It uses the asynchronous iterative routing (AIR) protocol to train and predict ANNs in a decentralized manner. The two main features of TensAIR are: (1) leveraging the iterative nature of SGD by updating the ANN model with fresh samples from the data stream instead of relying on buffered or pre-defined datasets; (2) its fully asynchronous and decentralized architecture used to update the ANN models using decentralized and asynchronous SGD (DASGD). Due to those two features, TensAIR achieves a nearly linear scale-out performance in terms of sustainable throughput and with respect to its number of worker nodes. Moreover, it was implemented using TensorFlow, which facilitates the deployment of diverse use-cases. Therefore, we highlight the following capabilities of TensAIR: (1) processing multiple data streams simultaneously; (2) training models using either CPUs, GPUs, or both; (3) training ANNs in an asynchronous and distributed manner; and (4) incorporating user-defined dataflow pipelines. We empirically demonstrate that—in a real-time streaming scenario—TensAIR supports from 4 to 120 more sustainable throughput than Horovod and both the standard and distributed TensorFlow APIs (representing upper bounds for Apache Kafka and Flink extensions).

As future work, we believe that TensAIR may also lead to novel online learning use cases which were previously considered too complex but now become feasible due to the very good sustainable throughput of TensAIR. Specifically, we intend to study similar learning tasks over audio/video streams, which we see as the main target domain for stream processing and OL. To reduce the computational cost of training an ANN indefinitely, we shall also investigate how different active concept-drift detection algorithms behave under an OL setting with ANNs.

Acknowledgements.

This work is funded by the Luxembourg National Research Fund under the PRIDE program (PRIDE17/12252781). The paper benefited from helpful comments and suggestions by Ovidiu Cristian Marcu. The experiments presented in this paper were carried outusing the HPC facilities of the University of Luxembourg(Varrette etal., 2022)(see hpc.uni.lu).

References

  • (1)
  • hor (2019)2019.Horovod with Keras.https://horovod.readthedocs.io/en/stable/keras.htmlAccessed: 2022-05-18.
  • dl- (2022)2022.Deep Learning on Flink.https://github.com/flink-extended/dl-on-flinkAccessed: 2022-08-05.
  • kaf (2022)2022.Robust machine learning on streaming data using Kafka and Tensorflow-IO.https://www.tensorflow.org/io/tutorials/kafkaAccessed: 2022-08-05.
  • ten (2022)2022.TensorFlow.https://www.tensorflow.org/text/tutorials/text_classification_rnnAccessed: 2022-05-27.
  • Akidau etal. (2015)Tyler Akidau, Robert Bradshaw, Craig Chambers, Slava Chernyak, RafaelJ. Fernández-Moctezuma, Reuven Lax, Sam McVeety, Daniel Mills, Frances Perry, Eric Schmidt, and Sam Whittle. 2015.The Dataflow Model: A Practical Approach to Balancing Correctness, Latency, and Cost in Massive-Scale, Unbounded, out-of-Order Data Processing.Proc. VLDB Endow. 8, 12 (aug 2015), 1792–1803.https://doi.org/10.14778/2824032.2824076
  • Barros and Santos (2018)Roberto SoutoMaior Barros and Silas Garrido TCarvalho Santos. 2018.A large-scale comparison of concept drift detectors.Information Sciences 451 (2018), 348–370.
  • Bornstein etal. (2023)Marco Bornstein, Tahseen Rabbani, Evan Wang, AmritS. Bedi, and Furong Huang. 2023.SWIFT: Rapid Decentralized Federated Learning via Wait-Free Model Communication. In The Eleventh International Conference on Learning Representations, ICLR 2023, Kigali, Rwanda, May 1-5, 2023. OpenReview.net.https://openreview.net/pdf?id=jh1nCir1R3d
  • Carbone etal. (2015)Paris Carbone, Asterios Katsifodimos, Stephan Ewen, Volker Markl, Seif Haridi, and Kostas Tzoumas. 2015.Apache Flink: Stream and batch processing in a single engine.Bulletin of the IEEE Computer Society Technical Committee on Data Engineering 36, 4 (2015).
  • Chen etal. (2019)Chen Chen, Wei Wang, and Bo Li. 2019.Round-robin synchronization: Mitigating communication bottlenecks in parameter servers. In IEEE INFOCOM 2019-IEEE Conference on Computer Communications. IEEE, 532–540.
  • Go etal. (2009)Alec Go, Richa Bhayani, and Lei Huang. 2009.Twitter sentiment classification using distant supervision.CS224N project report, Stanford 1, 12 (2009).
  • Gomes etal. (2019)HeitorMurilo Gomes, Jesse Read, Albert Bifet, JeanPaul Barddal, and João Gama. 2019.Machine learning for streaming data: state of the art, challenges, and opportunities.ACM SIGKDD Explorations Newsletter 21, 2 (2019), 6–22.
  • Goodfellow etal. (2016)Ian Goodfellow, Yoshua Bengio, and Aaron Courville. 2016.Deep Learning.MIT Press.http://www.deeplearningbook.org.
  • Heusinger etal. (2020)Moritz Heusinger, Christoph Raab, and Frank-Michael Schleif. 2020.Passive concept drift handling via variations of learning vector quantization.Neural Computing and Applications (2020), 1–12.
  • Hoi etal. (2021)StevenCH Hoi, Doyen Sahoo, Jing Lu, and Peilin Zhao. 2021.Online learning: A comprehensive survey.Neurocomputing 459 (2021), 249–289.
  • Hu etal. (2020)Hanqing Hu, Mehmed Kantardzic, and TegjyotS Sethi. 2020.No Free Lunch Theorem for concept drift detection in streaming data classification: A review.Wiley Interdisciplinary Reviews: Data Mining and Knowledge Discovery 10, 2 (2020), e1327.
  • Iwash*ta and Papa (2018)AdrianaSayuri Iwash*ta and JoaoPaulo Papa. 2018.An overview on concept drift learning.IEEE access 7 (2018), 1532–1547.
  • Jiang etal. (2017)Jiawei Jiang, Bin Cui, Ce Zhang, and Lele Yu. 2017.Heterogeneity-aware distributed parameter servers. In Proceedings of the 2017 ACM International Conference on Management of Data. 463–478.
  • Karimov etal. (2018)Jeyhun Karimov, Tilmann Rabl, Asterios Katsifodimos, Roman Samarev, Henri Heiskanen, and Volker Markl. 2018.Benchmarking distributed stream data processing systems. In 2018 IEEE 34th International Conference on Data Engineering (ICDE). 1507–1518.
  • Kreps etal. (2011)Jay Kreps, Neha Narkhede, Jun Rao, etal. 2011.Kafka: A distributed messaging system for log processing. In Proceedings of the NetDB, Vol.11. 1–7.
  • Krizhevsky etal. (2009)Alex Krizhevsky, Geoffrey Hinton, etal. 2009.Learning multiple layers of features from tiny images.Technical Report. University of Toronto, Department of Computer Science.
  • Kulkarni etal. (2015)Sanjeev Kulkarni, Nikunj Bhagat, Maosong Fu, Vikas Kedigehalli, Christopher Kellogg, Sailesh Mittal, JigneshM. Patel, Karthik Ramasamy, and Siddarth Taneja. 2015.Twitter Heron: Stream Processing at Scale. In Proceedings of the 2015 ACM SIGMOD International Conference on Management of Data (Melbourne, Victoria, Australia) (SIGMOD ’15). ACM, New York, NY, USA, 239–250.
  • Lian etal. (2018)Xiangru Lian, Wei Zhang, Ce Zhang, and Ji Liu. 2018.Asynchronous decentralized parallel stochastic gradient descent. In International Conference on Machine Learning. PMLR, 3043–3052.
  • Lu etal. (2018)Jie Lu, Anjin Liu, Fan Dong, Feng Gu, Joao Gama, and Guangquan Zhang. 2018.Learning under concept drift: A review.IEEE Transactions on Knowledge and Data Engineering 31, 12 (2018), 2346–2363.
  • Mayer and Jacobsen (2020)Ruben Mayer and Hans-Arno Jacobsen. 2020.Scalable deep learning on distributed infrastructures: Challenges, techniques, and tools.ACM Computing Surveys (CSUR) 53, 1 (2020), 1–37.
  • Noghabi etal. (2017)ShadiA Noghabi, Kartik Paramasivam, Yi Pan, Navina Ramesh, Jon Bringhurst, Indranil Gupta, and RoyH Campbell. 2017.Samza: stateful scalable stream processing at LinkedIn.Proceedings of the VLDB Endowment 10, 12 (2017), 1634–1645.
  • Ouyang etal. (2021)Shuo Ouyang, Dezun Dong, Yemao Xu, and Liquan Xiao. 2021.Communication optimization strategies for distributed deep neural network training: A survey.J. Parallel and Distrib. Comput. 149 (2021), 52–65.
  • Priya and Uthra (2021)S Priya and RAnnie Uthra. 2021.Deep learning framework for handling concept drift and class imbalanced complex decision-making on streaming data.Complex & Intelligent Systems (2021), 1–17.
  • Recht etal. (2011)Benjamin Recht, Christopher Re, Stephen Wright, and Feng Niu. 2011.Hogwild!: A lock-free approach to parallelizing stochastic gradient descent.Advances in neural information processing systems 24 (2011).
  • Robbins and Monro (1951)Herbert Robbins and Sutton Monro. 1951.A stochastic approximation method.The annals of mathematical statistics (1951), 400–407.
  • Sergeev and DelBalso (2018)Alexander Sergeev and Mike DelBalso. 2018.Horovod: fast and easy distributed deep learning in TensorFlow.arXiv preprint arXiv:1802.05799 (2018).
  • Sethi and Kantardzic (2017)TegjyotSingh Sethi and Mehmed Kantardzic. 2017.On the reliable detection of concept drift from streaming unlabeled data.Expert Systems with Applications 82 (2017), 77–99.
  • Sun etal. (2017)Tao Sun, Robert Hannah, and Wotao Yin. 2017.Asynchronous coordinate descent under more realistic assumptions.Advances in Neural Information Processing Systems 30 (2017).
  • Tosi etal. (2022)MauroDL Tosi, Vinu EllampallilVenugopal, and Martin Theobald. 2022.Convergence-Time Analysis of Asynchronous Distributed Artificial Neural Networks. In 5th Joint International Conference on Data Science & Management of Data (CODS/COMAD). 314–315.
  • Tosi and Theobald (2023)MauroDL Tosi and Martin Theobald. 2023.Convergence Analysis of Decentralized ASGD.arXiv e-prints (2023), arXiv–2309.
  • Varrette etal. (2022)S. Varrette, H. Cartiaux, S. Peter, E. Kieffer, T. Valette, and A. Olloh. 2022.Management of an Academic HPC & Research Computing Facility: The ULHPC Experience 2.0. In Proc. of the 6th ACM High Performance Computing and Cluster Technologies Conf. (HPCCT 2022). Association for Computing Machinery (ACM), Fuzhou, China.
  • Venugopal etal. (2020)VinuE Venugopal, Martin Theobald, Samira Chaychi, and Amal Tawakuli. 2020.AIR: A light-weight yet high-performance dataflow engine based on asynchronous iterative routing. In 2020 IEEE 32nd International Symposium on Computer Architecture and High Performance Computing (SBAC-PAD). 51–58.
  • Venugopal etal. (2022)VinuEllampallil Venugopal, Martin Theobald, Damien Tassetti, Samira Chaychi, and Amal Tawakuli. 2022.Targeting a Light-Weight and Multi-Channel Approach for Distributed Stream Processing.J. Parallel and Distrib. Comput. (2022).
  • Zaharia etal. (2016)Matei Zaharia, ReynoldS Xin, Patrick Wendell, Tathagata Das, Michael Armbrust, Ankur Dave, Xiangrui Meng, Josh Rosen, Shivaram Venkataraman, MichaelJ Franklin, etal. 2016.Apache Spark: a unified engine for big data processing.Commun. ACM 59, 11 (2016), 56–65.
  • Zhang etal. (2020)Xin Zhang, Jia Liu, and Zhengyuan Zhu. 2020.Taming convergence for asynchronous stochastic gradient descent with unbounded delay in non-convex learning. In 2020 59th IEEE Conference on Decision and Control (CDC). 3580–3585.
TensAIR: Real-Time Training of Neural Networks from Data-streams (2024)

References

Top Articles
Latest Posts
Article information

Author: Margart Wisoky

Last Updated:

Views: 6131

Rating: 4.8 / 5 (58 voted)

Reviews: 81% of readers found this page helpful

Author information

Name: Margart Wisoky

Birthday: 1993-05-13

Address: 2113 Abernathy Knoll, New Tamerafurt, CT 66893-2169

Phone: +25815234346805

Job: Central Developer

Hobby: Machining, Pottery, Rafting, Cosplaying, Jogging, Taekwondo, Scouting

Introduction: My name is Margart Wisoky, I am a gorgeous, shiny, successful, beautiful, adventurous, excited, pleasant person who loves writing and wants to share my knowledge and understanding with you.