# Stream Data to Event Hubs Using Kafka

This notebook illustrates how Spark Streaming can be used to send data to a Kafka sink, specifically an Azure Event Hub using the Kafka API.

First, we will create a dataframe with some made-up data.  Then we will run a loop that repeatedly, randomly samples from that dataframe and sends the sample to the Event Hub.  The process will continue to run until you turn it off;
don't forget to shut it down when you're done to save costs.

**NOTE:** Your event hub must be on the "Standard" pricing tier or higher.  The "Basic" tier does not support Kafka protocol.

In [0]:
# SETTINGS
# You need to customize the values below to match your environment


# This is the connection string for your event hub instance or your event hub namespace.  You can retrieve this from the Azure portal.  Note that it must include a Shared Access Key with "Send" permission.
event_hub_connection_string = "<YOUR_EVENT_HUB_CONNECTION_STRING>"


# This is the name of the event hub instance within your event hub namespace
topic_name = "<YOUR_EVENT_HUB_NAME>"


In [0]:
import pandas as pd

dfs = [
    pd.DataFrame(['red', 'blue', 'green', 'yellow', 'orange'], columns = ['color']),
    pd.DataFrame(['north', 'south', 'east', 'west'], columns = ['region']),
    pd.DataFrame(['basic', 'premium'], columns = ['tier']),
    pd.DataFrame(['glossy', 'matte'], columns = ['finish']),
    pd.DataFrame(['small', 'medium', 'large'], columns = ['size']),
    pd.DataFrame(['delivery', 'pickup'], columns = ['transit']),
    pd.DataFrame(['cardboard', 'plastic'], columns = ['packaging']),
    pd.DataFrame(['none', 'chrome', 'gold'], columns = ['accent']),
    pd.DataFrame([1, 1.25, 1.5, 1.75, 2, 2.5, 3, 4], columns = ['factor'])
]

for d in dfs:
    d['key'] = 0

expected_row_count = 1
for d in dfs:
    expected_row_count = expected_row_count * d.shape[0]

print(f'Expected Rows: {expected_row_count:,}')    

df = dfs[0]

for i in range(1, len(dfs)):
    df = df.merge(dfs[i], on='key', how='outer')

print(f'  Actual Rows: {df.shape[0]:,}')    
    
df = spark.createDataFrame(df).cache()
display(df)

In [0]:
import time
import pyspark.sql.functions as pyf
from datetime import datetime, timedelta
from random import gauss

def send_new_rows_to_kafka(now):
  sasl_config = f'kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{event_hub_connection_string}";'

  rows_in_df = df.count()
    
  while (True):
    new_now = datetime.now()

    row_count = abs(int(gauss(75, 25)))
    fraction = row_count / rows_in_df
    price_fuzz = abs(int(gauss(50, 30)))
    
    slice_df = (df
                    .sample(True, fraction)
                    .withColumn('sales', pyf.rand() * pyf.lit(price_fuzz) * pyf.col('factor'))
                    .withColumn('sales', pyf.round(pyf.col('sales'), 2))
                    .drop(*['factor', 'key'])
                    .withColumn('transaction_id', pyf.monotonically_increasing_id())
                    .selectExpr("CAST(transaction_id AS string) AS key", "to_json(struct(*)) AS value")                    
                    .cache()
                )

    row_count = slice_df.count()
    
    slice_df \
      .write \
      .format("kafka") \
      .option("kafka.bootstrap.servers", "ehns-stream-dataflow.servicebus.windows.net:9093") \
      .option("kafka.security.protocol", "SASL_SSL") \
      .option("kafka.sasl.mechanism", "PLAIN") \
      .option("kafka.sasl.jaas.config", sasl_config) \
      .option("topic", topic_name) \
      .save()
    
    print(f"Processed {row_count:,} rows between   {now}   and   {new_now}")

    now = new_now + timedelta(seconds = 0.000001)
    time.sleep( abs(gauss(10, 4)) )
  
send_new_rows_to_kafka(datetime.now() - timedelta(seconds = 5))