If you have not heard of tensorflow, please teach me the ways of the yedi! From the project website:
TensorFlow is an end-to-end open source platform for machine learning. It has a comprehensive, flexible ecosystem of tools, libraries and community resources that lets researchers push the state-of-the-art in ML and developers easily build and deploy ML powered applications.
People on the street import tensorflow as tf
, and so will the code below. Frankly, I am not a machine learning engineer,
but thought it would be cool to download the new Kafka-IO wrappers for producing and consuming
tensorflow native record format straight from Kafka without intermediaries and check if they work with redpanda… and of course it worked first try :D
Install redpanda
In this tutorial, I’ll only cover the ubuntu install, but head over to our github repo, give us a star and see download instructions for your operating system.
curl -1sLf \'https://packages.vectorized.io/nzc4ZYQK3WRGd9sy/redpanda/cfg/setup/bash.deb.sh' \| sudo -E bashsudo apt-get install redpanda && sudo systemctl start redpanda
Install deps on your machine
pip3 install --user kafka-python tensorflow-io sklearn pandas
Copy this gist
I copied this gist from the main github repo of the new tensorflow/io organization: https://github.com/tensorflow/io
#Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at## https://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.import osfrom datetime import datetimeimport timeimport threadingimport jsonfrom kafka import KafkaProducerfrom kafka.errors import KafkaErrorfrom sklearn.model_selection import train_test_splitimport pandas as pdimport tensorflow as tfimport tensorflow_io as tfioprint("tensorflow-io version: {}".format(tfio.__version__))print("tensorflow version: {}".format(tf.__version__))COLUMNS = [# labels'class',# low-level features'lepton_1_pT','lepton_1_eta','lepton_1_phi','lepton_2_pT','lepton_2_eta','lepton_2_phi','missing_energy_magnitude','missing_energy_phi',# high-level derived features'MET_rel','axial_MET','M_R','M_TR_2','R','MT2','S_R','M_Delta_R','dPhi_r_b','cos(theta_r1)']susy_iterator = pd.read_csv('SUSY.csv.gz', header=None, names=COLUMNS, chunksize=100000)susy_df = next(susy_iterator)susy_df.head()# Number of datapoints and columnslen(susy_df), len(susy_df.columns)# Number of datapoints belonging to each class (0: background noise, 1: signal)len(susy_df[susy_df["class"]==0]), len(susy_df[susy_df["class"]==1])train_df, test_df = train_test_split(susy_df, test_size=0.4, shuffle=True)print("Number of training samples: ",len(train_df))print("Number of testing sample: ",len(test_df))x_train_df = train_df.drop(["class"], axis=1)y_train_df = train_df["class"]x_test_df = test_df.drop(["class"], axis=1)y_test_df = test_df["class"]# The labels are set as the kafka message keys so as to store data# in multiple-partitions. Thus, enabling efficient data retrieval# using the consumer groups.x_train = list(filter(None, x_train_df.to_csv(index=False).split("\n")[1:]))y_train = list(filter(None, y_train_df.to_csv(index=False).split("\n")[1:]))x_test = list(filter(None, x_test_df.to_csv(index=False).split("\n")[1:]))y_test = list(filter(None, y_test_df.to_csv(index=False).split("\n")[1:]))NUM_COLUMNS = len(x_train_df.columns)len(x_train), len(y_train), len(x_test), len(y_test)def error_callback(exc):raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))def write_to_kafka(topic_name, items):count=0producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])for message, key in items:producer.send(topic_name, key=key.encode('utf-8'), value=message.encode('utf-8')).add_errback(error_callback)count+=1producer.flush()print("Wrote {0} messages into topic: {1}".format(count, topic_name))write_to_kafka("susy-train", zip(x_train, y_train))write_to_kafka("susy-test", zip(x_test, y_test))def decode_kafka_item(item):message = tf.io.decode_csv(item.message, [[0.0] for i in range(NUM_COLUMNS)])key = tf.strings.to_number(item.key)return (message, key)BATCH_SIZE=64SHUFFLE_BUFFER_SIZE=64train_ds = tfio.IODataset.from_kafka('susy-train', partition=0, offset=0)train_ds = train_ds.shuffle(buffer_size=SHUFFLE_BUFFER_SIZE)train_ds = train_ds.map(decode_kafka_item)train_ds = train_ds.batch(BATCH_SIZE)OPTIMIZER="adam"LOSS=tf.keras.losses.BinaryCrossentropy(from_logits=True)METRICS=['accuracy']EPOCHS=10# design/build the modelmodel = tf.keras.Sequential([tf.keras.layers.Input(shape=(NUM_COLUMNS,)),tf.keras.layers.Dense(128, activation='relu'),tf.keras.layers.Dropout(0.2),tf.keras.layers.Dense(256, activation='relu'),tf.keras.layers.Dropout(0.4),tf.keras.layers.Dense(128, activation='relu'),tf.keras.layers.Dropout(0.4),tf.keras.layers.Dense(1, activation='sigmoid')])print(model.summary())# compile the modelmodel.compile(optimizer=OPTIMIZER, loss=LOSS, metrics=METRICS)# fit the modelmodel.fit(train_ds, epochs=EPOCHS)test_ds = tfio.experimental.streaming.KafkaGroupIODataset(topics=["susy-test"],group_id="testcg",servers="127.0.0.1:9092",stream_timeout=10000,configuration=["session.timeout.ms=7000","max.poll.interval.ms=8000","auto.offset.reset=earliest"],)def decode_kafka_test_item(raw_message, raw_key):message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])key = tf.strings.to_number(raw_key)return (message, key)test_ds = test_ds.map(decode_kafka_test_item)test_ds = test_ds.batch(BATCH_SIZE)res = model.evaluate(test_ds)print("test loss, test acc:", res)online_train_ds = tfio.experimental.streaming.KafkaBatchIODataset(topics=["susy-train"],group_id="cgonline",servers="127.0.0.1:9092",stream_timeout=30000, # in milliseconds, to block indefinitely, set it to -1.configuration=["session.timeout.ms=7000","max.poll.interval.ms=8000","auto.offset.reset=earliest"],)def error_callback(exc):raise Exception('Error while sendig data to kafka: {0}'.format(str(exc)))def write_to_kafka_after_sleep(topic_name, items):time.sleep(30)print("#"*100)print("Writing messages into topic: {0} after a nice sleep !".format(topic_name))print("#"*100)count=0producer = KafkaProducer(bootstrap_servers=['127.0.0.1:9092'])for message, key in items:producer.send(topic_name,key=key.encode('utf-8'),value=message.encode('utf-8')).add_errback(error_callback)count+=1producer.flush()print("#"*100)print("Wrote {0} messages into topic: {1}".format(count, topic_name))print("#"*100)def decode_kafka_online_item(raw_message, raw_key):message = tf.io.decode_csv(raw_message, [[0.0] for i in range(NUM_COLUMNS)])key = tf.strings.to_number(raw_key)return (message, key)thread = threading.Thread(target=write_to_kafka_after_sleep,args=("susy-train", zip(x_train, y_train)))thread.daemon = Truethread.start()for mini_ds in online_train_ds:mini_ds = mini_ds.shuffle(buffer_size=32)mini_ds = mini_ds.map(decode_kafka_online_item)mini_ds = mini_ds.batch(32)model.fit(mini_ds, epochs=3)
Download the data
curl -sSOL https://archive.ics.uci.edu/ml/machine-learning-databases/00279/SUSY.csv.gz
Profit!
I had to install the cuda libs to get my GPU humming. However, after a quick install of that I was up and running!
❯ python3 main.py2021-01-11 22:23:02.910794: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcudart.so.11.0tensorflow-io version: 0.17.0tensorflow version: 2.4.0Number of training samples: 60000Number of testing sample: 40000Wrote 60000 messages into topic: susy-trainWrote 40000 messages into topic: susy-test2021-01-11 22:23:10.847745: I tensorflow_io/core/kernels/cpu_check.cc:128] Your CPU supports instructions that this TensorFlow IO binary was not compiled to use: AVX2 FMA2021-01-11 22:23:10.902048: I tensorflow/compiler/jit/xla_cpu_device.cc:41] Not creating XLA devices, tf_xla_enable_xla_devices not set2021-01-11 22:23:10.902534: I tensorflow/stream_executor/platform/default/dso_loader.cc:49] Successfully opened dynamic library libcuda.so.12021-01-11 22:23:10.928441: E tensorflow/stream_executor/cuda/cuda_driver.cc:328] failed call to cuInit: CUDA_ERROR_NO_DEVICE: no CUDA-capable device is detected2021-01-11 22:23:10.928471: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:169] retrieving CUDA diagnostic information for host: darktower2021-01-11 22:23:10.928483: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:176] hostname: darktower2021-01-11 22:23:10.928573: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:200] libcuda reported version is: 460.27.42021-01-11 22:23:10.928598: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:204] kernel reported version is: 455.28.02021-01-11 22:23:10.928608: E tensorflow/stream_executor/cuda/cuda_diagnostics.cc:313] kernel version 455.28.0 does not match DSO version 460.27.4 -- cannot find working devices in this configuration2021-01-11 22:23:10.931185: I tensorflow/compiler/jit/xla_gpu_device.cc:99] Not creating XLA devices, tf_xla_enable_xla_devices not set2021-01-11 22:23:11.935134: I tensorflow_io/core/kernels/kafka_kernels.cc:349] Kafka tail: 59449Model: "sequential"_________________________________________________________________Layer (type) Output Shape Param #=================================================================dense (Dense) (None, 128) 2432_________________________________________________________________dropout (Dropout) (None, 128) 0_________________________________________________________________dense_1 (Dense) (None, 256) 33024_________________________________________________________________dropout_1 (Dropout) (None, 256) 0_________________________________________________________________dense_2 (Dense) (None, 128) 32896_________________________________________________________________dropout_2 (Dropout) (None, 128) 0_________________________________________________________________dense_3 (Dense) (None, 1) 129=================================================================Total params: 68,481Trainable params: 68,481Non-trainable params: 0_________________________________________________________________None2021-01-11 22:23:12.036891: I tensorflow/compiler/mlir/mlir_graph_optimization_pass.cc:116] None of the MLIR optimization passes are enabled (registered 2)2021-01-11 22:23:12.038427: I tensorflow/core/platform/profile_utils/cpu_utils.cc:112] CPU Frequency: 2894530000 HzEpoch 1/102021-01-11 22:23:12.283066: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 01/Unknown - 0s 415ms/step - loss: 0.7726 - accuracy: 0.37502021-01-11 22:23:12.518990: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 102429/Unknown - 1s 20ms/step - loss: 0.6915 - accuracy: 0.54202021-01-11 22:23:13.043534: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 204846/Unknown - 2s 25ms/step - loss: 0.6721 - accuracy: 0.57392021-01-11 22:23:13.575452: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 307262/Unknown - 2s 27ms/step - loss: 0.6572 - accuracy: 0.59512021-01-11 22:23:14.102275: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 409678/Unknown - 3s 28ms/step - loss: 0.6444 - accuracy: 0.61152021-01-11 22:23:14.638575: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 512093/Unknown - 3s 29ms/step - loss: 0.6345 - accuracy: 0.62332021-01-11 22:23:15.167787: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 6144109/Unknown - 4s 30ms/step - loss: 0.6251 - accuracy: 0.63442021-01-11 22:23:15.696971: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 7168126/Unknown - 4s 30ms/step - loss: 0.6165 - accuracy: 0.64432021-01-11 22:23:16.213187: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 8192128/Unknown - 5s 33ms/step - loss: 0.6155 - accuracy: 0.64542021-01-11 22:23:16.733228: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 9216144/Unknown - 5s 33ms/step - loss: 0.6086 - accuracy: 0.65312021-01-11 22:23:17.252568: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 10240160/Unknown - 6s 33ms/step - loss: 0.6024 - accuracy: 0.65982021-01-11 22:23:17.771701: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 11264190/Unknown - 6s 31ms/step - loss: 0.5925 - accuracy: 0.67032021-01-11 22:23:18.295851: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 12288192/Unknown - 7s 33ms/step - loss: 0.5920 - accuracy: 0.67092021-01-11 22:23:18.815791: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 13312222/Unknown - 7s 31ms/step - loss: 0.5842 - accuracy: 0.67902021-01-11 22:23:19.343171: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 14336224/Unknown - 8s 33ms/step - loss: 0.5837 - accuracy: 0.67952021-01-11 22:23:19.857738: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 15360254/Unknown - 8s 31ms/step - loss: 0.5770 - accuracy: 0.68632021-01-11 22:23:20.379426: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 16384270/Unknown - 9s 31ms/step - loss: 0.5737 - accuracy: 0.68952021-01-11 22:23:20.899491: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 17408286/Unknown - 9s 31ms/step - loss: 0.5706 - accuracy: 0.69252021-01-11 22:23:21.420403: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 18432302/Unknown - 10s 31ms/step - loss: 0.5677 - accuracy: 0.69522021-01-11 22:23:21.941930: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 19456304/Unknown - 10s 33ms/step - loss: 0.5674 - accuracy: 0.69562021-01-11 22:23:22.461011: I tensorflow_io/core/kernels/kafka_kernels.cc:248] Kafka stream starts with current offset: 20480
Note: I did time myself when copying the gists into my teminal and it was a bit less than 3mins.
Sign up for our Community Slack (here!) and engage with us on twitter via @redpandadata or personally at @emaxerrno