Realtime Data Streaming | End To End Data Engineering Project
Zusammenfassung
TLDRO video explora un proxecto de enxeñaría de datos que utiliza múltiples tecnoloxías avanzadas, incluíndo Apache Airflow, Zookeeper, Kafka, Cassandra e PostgreSQL, todo containerizado con Docker. Comeza cunha API de xeración de usuario aleatorio que fornece datos falsos que logo son extraídos por Airflow e procesados nun fluxo de traballo que se move a través de Kafka. A arquitectura do sistema engloba un DAG en Airflow para recuperación intermitente de datos, os cales son enviados a unha cola de Kafka xestionada por Zookeeper. Os datos son visualizados nun Control Center e organizados por un Schema Registry. Apache Spark manexa o procesamento de datos con Cassandra actuando como destino de almacenamento final. Todo está executado en contedores Docker, promovendo a eficiencia na xestión e despregado de servizos interdependentes. O tutorial abarca desde configurar e executar os contedores ata a implementación e conexión coas distintas bases de datos, demostrando un fluxo completo desde a adquisición de datos (data ingestion) ata o seu almacenamento final.
Mitbringsel
- 🔧 Proxecto de enxeñaría de datos completo.
- 📡 Uso de tecnoloxías avanzadas como Apache Airflow e Kafka.
- 🐳 Sistema totalmente containerizado con Docker.
- 📊 Visualización de datos a través de Kafka Control Center.
- ⚙️ Apache Spark para procesamento de datos.
- 🔑 Schema Registry para xestión de esquemas Avro.
- 🎛️ Apache Zookeeper para xestión de brokers en Kafka.
- 🔄 Interacción e integración fluída entre múltiples sistemas.
- 📋 Uso de Python para scripts e operativa.
- 🔒 Almacenamento de datos procesados en Cassandra.
Zeitleiste
- 00:00:00 - 00:05:00
O vídeo comeza co presentador a saudar os espectadores e a introducilos no tema do vídeo, que está dirixido a persoas interesadas en enxeñaría de datos. Promete mostrar un proxecto de enxeñaría de datos de extremo a extremo, utilizando tecnoloxías como Apache airflow, zookeeper, Kafka, Cassandra e PostgreSQL, todas containerizadas con Docker.
- 00:05:00 - 00:10:00
O presentador mostra unha visión xeral da arquitectura do sistema. Empeza coa descrición do API xerador de usuarios random. Este API xera datos ficticios sobre persoas que se usan dentro do sistema para obter información mediante un DAG en Apache Airflow de forma intermitente e enviar os datos a Apache Kafka.
- 00:10:00 - 00:15:00
Avanza na explicación do fluxo de datos que van desde Apache Kafka usando Apache Zookeeper para o manexo de brokers múltiples, ata Apache Spark para o procesamento dos datos. A xestión dos contedores faise a través de Docker. Conclúe a visión xeral do sistema explicando como se visualizan os datos no Control Center e a importancia do Schema Registry para interpretar os datos en Kafka.
- 00:15:00 - 00:20:00
Comeza a implementación da parte da obtención de datos. O presentador demostra como configurar un proxecto en Python para traballar coa API Random User. Ensina aos espectadores como obter datos do API, formatealos e visualizalos no terminal. Utiliza Python operador de Airflow para cartear tarefas dentro da súa configuración por defecto.
- 00:20:00 - 00:25:00
A implementación da arquitectura avanza coa creación do fluxo de traballo en Apache Airflow. O presentador crea arquivos necesarios e configure o operador Python para manexar o fluxo de datos desde o API. Asegúrese de estabelecer parámetros básicos como a hora de comezo e quen posúe o traballo ou DAG.
- 00:25:00 - 00:30:00
O presentador demostra como enviar datos ao Apache Kafka desde o DAG creado en Apache Airflow. Instala as dependencias necesarias e configura un productor de Kafka para enviar os datos a un tópico chamado 'users-created'.
- 00:30:00 - 00:35:00
Conclúe a sección de configuración de Kafka introducindo configuracións dentro do ficheiro Docker-Compose para levantar contornos necesarios como Apache Zookeeper, brokers Kafka e servicios relacionados, asegurándose que todos poden comunicarse correctamente.
- 00:35:00 - 00:40:00
Corrección de erros na configuración de Docker-Compose proporcionada para asegurar a conexión axeitada entre todos os contedores incluíndo Apache Airflow e os servicios de Kafka.
- 00:40:00 - 00:45:00
O presentador pon a proba o sistema realizando operacións para confirmar que Apache Airflow e Apache Kafka funcionan correctamente, mediante a visualización de mensaxes no Control Center.
- 00:45:00 - 00:50:00
Comeza a implementar o uso de Apache Spark, asegurando previamente que todos os compoñentes de Apache Kafka e contornos Airflow están a funcionar correctamente. Continúa co código de Spark, dirixido a procesar datos de Kafka usando a arquitectura máster-traballador.
- 00:50:00 - 00:55:00
Explica a configuración dos containers Spark dentro de Docker-Compose, incluíndo mestre e traballadores. Indica cómo configurar os traballadores adicionais se se desexa.
- 00:55:00 - 01:00:00
Progresión á ferramenta de almacenamento con Apache Cassandra. Configura Docker-Compose para levantar unha instancia de Cassandra para recibir os datos procesados dende Apache Spark.
- 01:00:00 - 01:05:00
Detalla a creación do fluxo de traballo completo, incluíndo a configuración para que Apache Spark poida obter datos de Kafka e realizar tarefas de transformación antes de insertalos en Apache Cassandra.
- 01:05:00 - 01:10:00
Ofrece solución a erros no proceso descrito, especialmente no manexo de conexións e integración entre Apache Spark e Cassandra, asegurando que as conexións están corréctamente estabelecidas e as datos recibidas.
- 01:10:00 - 01:15:00
Descrición do proceso de finalización, comprobando a inserción exitosa de datos procesados en Apache Cassandra, e conclusión do pipeline de enxeñaría de datos de extremo a extremo.
- 01:15:00 - 01:20:00
Conclúe o vídeo revisando todo o pipeline construído, asegurando que cada compoñente está integrado e funcionando de maneira efectiva. Anima aos espectadores a unirse para máis contidos similares, subliñando a utilidade do proxecto mostrado.
- 01:20:00 - 01:27:47
Finaliza o vídeo agradecendo aos espectadores por seguir o tutorial e anima a compartir o vídeo e subscribirse para futuros contidos sobre enxeñaría de datos e proxectos similares.
Mind Map
Häufig gestellte Fragen
Que API se utiliza para xerar datos de usuario aleatorios?
Utilízase a API Random User Generator para xerar datos de usuario aleatorios.
Cal é o propósito de Apache Zookeeper neste proxecto?
Zookeeper actúa como xestor para Apache Kafka, xestionando múltiples brokers en Kafka.
Para que se usa Apache Airflow?
Apache Airflow utilízase para orquestrar a extracción de datos desde a API e o seu procesamento.
Como se visualizan os datos en Kafka?
Os datos en Kafka visualízanse a través dun Control Center, que ofrece unha interface para ver os temas e mensaxes en Kafka.
Que fai a Schema Registry?
Schema Registry proporciona unha interface para almacenar e recuperar esquemas Avro, o que axuda durante a visualización dos fluxos de datos de Kafka.
Que rol ten Apache Spark no sistema?
Apache Spark utilízase para procesar os datos obtidos de Kafka e despois envialos a Cassandra.
Como está containerizado o sistema?
Todo o sistema está containerizado utilizando Docker, facilitando a súa posta en marcha e xestión.
Cales son as linguaxes principais utilizadas no desenvolvemento deste proxecto?
As linguaxes principais son Python para a implementación de scripts e configuracións, con Docker para a containerización.
Que bases de datos se utilizan neste proxecto?
Úsanse PostgreSQL para almacenar datos de configuración e Cassandra para almacenar os datos procesados.
Cal é a función do Docker Compose neste proxecto?
Docker Compose utilízase para definir e correr múltiples contedores Docker, facilitando a configuración e xestión do proxecto.
Weitere Video-Zusammenfassungen anzeigen
- 00:00:00thank you
- 00:00:00[Music]
- 00:00:14hello and welcome back to the channel
- 00:00:16today's video is for people that are
- 00:00:19passionate about data engineering if you
- 00:00:22are looking to build scalable and robust
- 00:00:23system
- 00:00:25then today's video is just for you
- 00:00:27before we continue don't forget to
- 00:00:29subscribe to the channel and ring the
- 00:00:31notification Bell so you don't miss out
- 00:00:33on any future video
- 00:00:35in today's episode I'll be walking you
- 00:00:37through an end-to-end data engineering
- 00:00:39project that stitches together several
- 00:00:41essential Technologies like Apache
- 00:00:44airflow Apache zookeeper Kafka Cassandra
- 00:00:47and postgres SQL all containerized with
- 00:00:50Docker so let's Dive Right In and get
- 00:00:53started let's take a look at the eye
- 00:00:55level view of the system architecture we
- 00:00:57start by having an API which is the
- 00:00:59random user generator API this is used
- 00:01:01for generating random user data
- 00:01:04cells like lorem ipsum but this is
- 00:01:07basically for people
- 00:01:09all this data is not surreal and these
- 00:01:13are not real people
- 00:01:15this is how to use the API and this is
- 00:01:17the results that we're expecting from
- 00:01:19the API itself so going back to the
- 00:01:21architecture we are going to have a dag
- 00:01:23in Apache airflow which is fetching data
- 00:01:26from this API intermittently
- 00:01:29the data fetch is going to be streamed
- 00:01:33into Kafka Q by the way Apache airflow
- 00:01:36with our configuration is going to be
- 00:01:38running on postgres SQL
- 00:01:41now the data is that we get from this
- 00:01:43API is going to be streamed into Kafka
- 00:01:45which is sitting on Apache zookeeper
- 00:01:49and the Zookeeper is the manager to
- 00:01:51manage all the multiple Brokers that we
- 00:01:53have on Kafka so if we have multiple
- 00:01:55Brokers let's say we have five or three
- 00:01:58forecasts for Kafka broadcast zookeeper
- 00:02:01is going to be the manager so when one
- 00:02:02goes down another one replaces the
- 00:02:04dollar is restarts the process itself
- 00:02:07then the data inside the Kafka broadcast
- 00:02:10will be visualized in our control center
- 00:02:13the control center serves as a UI where
- 00:02:16we can see what is going on in our Kafka
- 00:02:18broadcast the number of messages that
- 00:02:20are coming into different topics and the
- 00:02:22different topics on the Kafka Cube while
- 00:02:24the schema registry it provides us a
- 00:02:26seven layer for the metadata the schema
- 00:02:29registry is a restful interface for
- 00:02:31storing and retrieving average schema
- 00:02:33which is particularly useful when the
- 00:02:36Kafka streams uh is being visualized so
- 00:02:40we can understand the schema of the
- 00:02:41records all the data that is coming into
- 00:02:43Kafka
- 00:02:44so the data that we get from Kafka will
- 00:02:47be streamed
- 00:02:48with um Apache spot
- 00:02:51we have a master worker architecture
- 00:02:53being set up on Apache Spark when a job
- 00:02:57is submitted to the master
- 00:02:59the master decides which of the worker
- 00:03:01takes up the job and run the data run
- 00:03:05the task
- 00:03:07once task is run the task in this case
- 00:03:09will be to stream data into Cassandra so
- 00:03:12we have a listener
- 00:03:14that is going to be getting data from
- 00:03:16Kafka into spark then streamed directly
- 00:03:19into Cassandra
- 00:03:21all this architecture are running on
- 00:03:24Docker containers which you are going to
- 00:03:26have a single Docker compose file
- 00:03:29that helps us to spin up all these
- 00:03:33architecture that's the basic
- 00:03:35architecture we have right now and
- 00:03:38we can dive right into
- 00:03:40the first level getting data from the
- 00:03:44random user API without a dag
- 00:03:47I'm going to start by creating a new
- 00:03:49project called Data engineering
- 00:03:55I'm using python 3.11
- 00:03:59so you can use any python I think the
- 00:04:02minimum version
- 00:04:03that may be supported should be 3.9 so
- 00:04:06but in my case I'll be using 30.11
- 00:04:13in this environment we have a main.py
- 00:04:17which we can we're going to be using
- 00:04:19this stamina a lot so it's best to
- 00:04:21ensure that our terminal is properly set
- 00:04:23up I'm going to start by deactivating
- 00:04:25the current Source then I'm going to
- 00:04:26Source it again
- 00:04:28I'm going to increase the size of the UI
- 00:04:30so it's visible to everybody so I'm
- 00:04:33going to have a
- 00:04:34my editor I'm going to have a font I'm
- 00:04:37going to increase it to 20.
- 00:04:40okay
- 00:04:42all right
- 00:04:43with this if I do python main.py
- 00:04:47I python good so that means the
- 00:04:49environment is properly set up and if I
- 00:04:52check which python
- 00:04:54is in my let's see the right directory
- 00:04:56good
- 00:04:57all right I don't need to do something
- 00:04:59py so I'm going to delete it
- 00:05:03I'm going to create a new folder
- 00:05:06I'll call the folder Doug
- 00:05:10in because we've activated our BMV so we
- 00:05:13need to install the base package that
- 00:05:15we're going to be using keep install I'm
- 00:05:17going to install Apache airflow
- 00:05:21now this is going to initialize the
- 00:05:22airflow so we're going to start from
- 00:05:23this guy
- 00:05:24connected to this API level and we're
- 00:05:27going to spin up this instance in a
- 00:05:29little bit
- 00:05:43now that the installation is done I'm
- 00:05:45going to be
- 00:05:48I'm going to be creating a new file in
- 00:05:50our dag and I'll call it Kafka stream
- 00:05:57this file is going to be where we are
- 00:05:59going to be running the the dark from so
- 00:06:02I'm going to be importing some packages
- 00:06:03from airflow and some internal packages
- 00:06:06so from date time
- 00:06:09import date time
- 00:06:11then from airflow I'm going to be
- 00:06:14importing dark
- 00:06:17all right
- 00:06:19the other thing that I'll be importing
- 00:06:21from airflow is the python operator that
- 00:06:23I'll be using
- 00:06:24to fetch this data operators.python
- 00:06:29import python operator
- 00:06:33now I need a default argument which is
- 00:06:35going to be my default ax
- 00:06:37that I'll be using to
- 00:06:40attach to my dag itself so to know who
- 00:06:43owns the project and some properties
- 00:06:45that I'll be using so the owner is going
- 00:06:48to be
- 00:06:49going to just do a scholar
- 00:06:52then it starts date
- 00:06:55is going to be
- 00:06:56the start date is going to be date time
- 00:07:02that I just imported and that would be
- 00:07:052023 2023
- 00:07:09uh
- 00:07:11nine
- 00:07:13three and uh I could just use 10 o'clock
- 00:07:17it doesn't really matter what time it
- 00:07:19says
- 00:07:20all right so this is uh 2023
- 00:07:23the August 39 September 3 and then 10
- 00:07:29o'clock
- 00:07:31all right now that I have my default AG
- 00:07:34so I'm going to create my dag which is
- 00:07:37going to be where uh where it's going to
- 00:07:41serve as as an entry point so I'm going
- 00:07:43to have with dag which is my dag I'm
- 00:07:47going to call the task ID
- 00:07:49uh the ID is going to be user automation
- 00:07:54I'll call it user automation
- 00:07:56all right and my default ads default ads
- 00:08:00is going to be
- 00:08:02default ads the scheduled interfer is
- 00:08:05going to be
- 00:08:07daily uh call this art daily
- 00:08:12and there is no need for catch up so
- 00:08:14I'll just put catch up
- 00:08:16close so this is going to be running as
- 00:08:18our dag I'm going to minimize this for
- 00:08:20now
- 00:08:22then
- 00:08:24we are going to have a python operator
- 00:08:27I'm going to call this streaming task
- 00:08:30okay it's going to be a python operator
- 00:08:32and
- 00:08:33I'll call the task ID
- 00:08:36stream
- 00:08:38data from
- 00:08:40API
- 00:08:42okay
- 00:08:44then with python operator we need the
- 00:08:46python callable which is going to be the
- 00:08:49function that we're going to be calling
- 00:08:50so I'll call this stream data function
- 00:08:54so we don't have it so we need to create
- 00:08:57a function so I'll call this the Stream
- 00:09:02data
- 00:09:03I'm going to be importing Json because
- 00:09:06this is going to be what I'll be
- 00:09:08formatting the response like us so what
- 00:09:12I need to do is I I need to import
- 00:09:14request
- 00:09:15to get the data from uh request to get
- 00:09:20your data from the API itself so I'm
- 00:09:22going to have
- 00:09:23a request
- 00:09:26.get
- 00:09:27I'm going to go into the API
- 00:09:30in here
- 00:09:32I'll just copy this URL
- 00:09:36I'll paste it in here as my API there's
- 00:09:39no other parameters to be set so it's
- 00:09:41fine so what I'll do is I'll go back in
- 00:09:44here my request is going to be uh saved
- 00:09:47in response so I'm going to have
- 00:09:50press.json so I'm going to have stream
- 00:09:53data at this point so we can test run
- 00:09:56this before running the
- 00:09:58the dug itself so rest.json I'm going to
- 00:10:02print in this
- 00:10:05okay
- 00:10:07so if I run this
- 00:10:09by the time you know
- 00:10:11I'm going to clear this up
- 00:10:13and I do python docs and I call this
- 00:10:16Kafka
- 00:10:18string
- 00:10:21so it should get me the data
- 00:10:24from the API
- 00:10:27uh yeah I'm going to comment this out
- 00:10:33oh no
- 00:10:35yeah I'll just comment that
- 00:10:38okay and
- 00:10:41I'll run this again
- 00:10:43okay good so I have access to the
- 00:10:46results from the API and as expected we
- 00:10:49have the result which is coming in an
- 00:10:51array
- 00:10:52of data and the first record at this
- 00:10:55point is this guy
- 00:10:58the second part
- 00:11:00is the info so we only need access to
- 00:11:03this
- 00:11:04to the results and this is results and
- 00:11:06info we don't need this info for now we
- 00:11:09need this result and we only need the
- 00:11:11access to the first record which is this
- 00:11:13Json the large Json that we have so to
- 00:11:17put that in context we are going to be
- 00:11:20getting the response somewhere here
- 00:11:24and
- 00:11:25I'm going to put that press because to
- 00:11:29your breasts I'm going to get the
- 00:11:31results part of the data and I'll get
- 00:11:34the first index which is this guy
- 00:11:37from here the first index from that
- 00:11:40so if I print this I'm going to put I'm
- 00:11:43going to print this
- 00:11:47okay and let's run this again
- 00:11:51I have the gender and I think we can
- 00:11:55still format this uh
- 00:11:57nicely can't we I think we can do
- 00:12:01json.toms we just dumped the J at the
- 00:12:04response and then we indent it as a
- 00:12:07maybe three
- 00:12:09if we do that
- 00:12:10we'll run this again
- 00:12:13I should run this again we shall have
- 00:12:15something pretty nicely laid out for us
- 00:12:17so we have the gender which is the mail
- 00:12:20the location email login ID and all that
- 00:12:25which is good I think this is a good
- 00:12:27step forward before we continue to
- 00:12:29ensure that the each of the task is fine
- 00:12:33all right so I have the stream data but
- 00:12:36this is not exactly what I want so what
- 00:12:38I want is I'm going to separate this a
- 00:12:41little bit so I have get data
- 00:12:44get data
- 00:12:46so I'm going to move all this guy
- 00:12:48into this function and I'm going to
- 00:12:51return at this point I'm going to return
- 00:12:54response because this is what I need I
- 00:12:57just need to get the API data
- 00:13:00format it in Json
- 00:13:02that's the right I get the response in
- 00:13:04Json format and I get the results and
- 00:13:07the first index of that result that's
- 00:13:08what I'm doing with the get data
- 00:13:10function
- 00:13:13meanwhile
- 00:13:14if I come back in here
- 00:13:17I need to put this in in a
- 00:13:21in a specific format okay because that's
- 00:13:25what I want to put on my Kafka queue
- 00:13:27right I need to have a format data where
- 00:13:30I'm going to be formatting this
- 00:13:31particular data that is coming in so I'm
- 00:13:33going to have format data
- 00:13:35so I'm going to get the response that I
- 00:13:38get from the API
- 00:13:40into this function so I'm going to have
- 00:13:42a single
- 00:13:44I have a variable called function this
- 00:13:47object this is where I'm going to be
- 00:13:49holding all these data
- 00:13:51okay yeah it's a flute
- 00:13:55so I can put this somewhere here
- 00:14:00I'll just uh
- 00:14:02put it on the side
- 00:14:05and I should be able to see all the data
- 00:14:10and won't go like that
- 00:14:12yeah
- 00:14:13so if I have this
- 00:14:15I have the gender as a Mill and so what
- 00:14:19I need to do I need to extract this data
- 00:14:21so what I what I want to do is get the
- 00:14:23I'll start with the the uh the first
- 00:14:26name so I'm going to have the data
- 00:14:28first name
- 00:14:30which is going to be coming from the
- 00:14:33name Let's see we have the name which is
- 00:14:36the first and the last name right so
- 00:14:39what I want to do is I want to get a
- 00:14:41name
- 00:14:42and I want to get the first name
- 00:15:15press because to format data and I'll
- 00:15:18put in press
- 00:15:20and let's see if I I have the right
- 00:15:23results
- 00:15:25if I run this again
- 00:15:27I should have
- 00:15:30yeah it says picture
- 00:15:32yeah instead of date I should be rest of
- 00:15:35course
- 00:15:37that's a typo
- 00:15:39now we have a more streamlined data
- 00:15:42which is more of what we need we have
- 00:15:45the first name for this person and if
- 00:15:48you look at this data that we have
- 00:15:51yeah
- 00:15:53if you look at the data this is the
- 00:15:55picture of the guy that we have which is
- 00:15:58fine that's what we need
- 00:16:01as of this moment so the next thing we
- 00:16:04need to do is we need to add Kafka
- 00:16:07teacher to this well before we do that
- 00:16:09we need to set up our Kafka this is just
- 00:16:12a
- 00:16:13this is just a way for us to get access
- 00:16:15to this API so the next thing we are
- 00:16:18going to do
- 00:16:20these are going to set up Apache airflow
- 00:16:23on the docker container right now I just
- 00:16:25import installed Apache airflow it has
- 00:16:28not been doing anything on my system so
- 00:16:30we need to set up a Docker compose file
- 00:16:33that is going to be
- 00:16:35initializing this Apache air flow and
- 00:16:38Kafka the schema registry and the rest
- 00:16:41okay
- 00:16:43now so what we need to do is we need to
- 00:16:46in our files here
- 00:16:49I'll just put this back
- 00:16:51where it's supposed to be and I'll just
- 00:16:53minimize this so I'll just come in here
- 00:16:56in our root directory I'll create a new
- 00:16:58file I'll call it Docker file Docker
- 00:17:01compose
- 00:17:03because that's what we want we want to
- 00:17:05we we we don't need a Docker file really
- 00:17:08what we need is a Docker compose so we
- 00:17:10can just spin up and spin down at any
- 00:17:12point in time we want so which is fine
- 00:17:15so I'm going to start with the Zookeeper
- 00:17:17so we can just put in this architecture
- 00:17:19pretty nicely so it looks like the
- 00:17:22dependencies from Apaches zookeeper down
- 00:17:25to the Kafka Kafka is Con connected to
- 00:17:28the control center then the schema
- 00:17:30registry and this guy is stand alone
- 00:17:32because it's not connected to the
- 00:17:34Confluence architecture we're going to
- 00:17:36be using the golf plate systems here
- 00:17:38then we're going to have a separate
- 00:17:40system which is going to be discard the
- 00:17:43Apache spark and a separate system for
- 00:17:45Cassandra so so let's see how that is
- 00:17:48going to happen
- 00:18:03now this should prepare a zookeeper
- 00:18:07ready for operation so this is basic
- 00:18:10stuff and if you need to do some more
- 00:18:15detailed configuration of the zoo keypad
- 00:18:17so you maybe you have some special
- 00:18:19parameters that you want to to look into
- 00:18:22I think it may be best to consult the
- 00:18:25documentation so on the different
- 00:18:27environment variables and our best to
- 00:18:29tweak this configurations but but for
- 00:18:32now this should do for us
- 00:18:33in our in our case so we're going to
- 00:18:36have a broker
- 00:18:37and I'll call the image name this is
- 00:18:39going to come from Confluence Inc also
- 00:18:42and this is going to be CP server so
- 00:18:44this is going to be a broker
- 00:18:46uh if you check the docker up you should
- 00:18:49see so right now we just ticked off
- 00:18:52Apache zookeeper so the next one we are
- 00:18:55working on is this Kafka
- 00:18:59the server
- 00:19:40so that passes for uh
- 00:19:43uh Brew guide so so we need to go now
- 00:19:46for the schema registry
- 00:20:10and uh
- 00:20:12I think for the dependence the only
- 00:20:14dependency we have left is the control
- 00:20:16center
- 00:20:17but technically the control center is
- 00:20:20dependent on the schema registry because
- 00:20:23the average schemas that this the
- 00:20:26average schema that the schema registry
- 00:20:29allows the Kafka to visualize
- 00:20:32on the UI it's going to be it's going to
- 00:20:35be a dependencies on the schema registry
- 00:20:37so technically this control center is
- 00:20:40listening for events on schema registry
- 00:20:43to visualize the data directly on Kafka
- 00:20:46which is managed by zookeeper so that's
- 00:20:49the dependency Dynamics anyways so we
- 00:20:52just need to add the control center
- 00:21:09so we don't have any engines here but if
- 00:21:12you look at the images I already have
- 00:21:14some images
- 00:21:15installed already I already downloaded
- 00:21:17the
- 00:21:19the images from
- 00:21:23from the confluence
- 00:21:27Docker help so all I need to do is do
- 00:21:30Docker compose up
- 00:21:33and I'll do in the dash mode
- 00:21:37so this is going to create so for some
- 00:21:39if this is your first time without these
- 00:21:42images what it's going to do is going to
- 00:21:45pull those images down to your system
- 00:21:46but because I already done that so
- 00:21:49that's why you're not seeing the pulling
- 00:21:51so it's I already have the images and
- 00:21:53it's going to just spin up the
- 00:21:54containers from those images directly
- 00:21:58so now this is what I was talking about
- 00:22:00the other time Suzuki bikes is only when
- 00:22:03it was LD that the broker started and
- 00:22:06you can see these guys are still created
- 00:22:08waiting for the broker to be held before
- 00:22:10it continues if you look at the UI they
- 00:22:12are not they are not started now that
- 00:22:15this guy is uh is running is because the
- 00:22:18broadcast says tit is ready to accept
- 00:22:20connection
- 00:22:22so yeah so that's the dependency level
- 00:22:24until one is done before the other uh
- 00:22:28continue so if there's any error at this
- 00:22:30point
- 00:22:31we we need to check
- 00:22:33to be sure that the environment is ready
- 00:22:36and ready to accept connection before we
- 00:22:39continue
- 00:22:40so we can fix any error
- 00:22:42and uh
- 00:22:44continue
- 00:22:47so we have the schema registry I think
- 00:22:50it's going to take the most time because
- 00:22:52it's uh
- 00:22:54I think it's 30 seconds because you know
- 00:22:56yeah it's 30 seconds so it's going to be
- 00:22:58checking every 30 seconds
- 00:23:01and the timeout is 10 seconds maybe I
- 00:23:03should have reduced that but right now
- 00:23:05this uh
- 00:23:07let's see
- 00:23:13and these are warnings we should be fine
- 00:23:18we're waiting for yeah the schema
- 00:23:21registry is okay now and the control
- 00:23:22center is coming up so which is okay uh
- 00:23:26the control center also is started which
- 00:23:28is fine I think at this point
- 00:23:30uh we are done with this part we only
- 00:23:34need to confirm on the UI that the
- 00:23:37environment is up and running and that
- 00:23:40is ready to accept connections so what
- 00:23:42we need to do
- 00:23:43if you just click on this guy or go to
- 00:23:46localhost 9021
- 00:23:49I think we are a little bit early so
- 00:23:52let's check
- 00:23:55yeah the control center is still
- 00:23:56starting off
- 00:24:01we just wait till it's done
- 00:24:04he says started Network server and this
- 00:24:07is ready
- 00:24:08okay so if we refresh this page we
- 00:24:11should be good
- 00:24:14now this is our control center
- 00:24:18so the importance of those connections
- 00:24:19that we did uh is to be sure that we
- 00:24:23have the right Brokers and we can
- 00:24:25visualize the uh
- 00:24:27the data right now we don't need any
- 00:24:30connections to K SQL DB or connect which
- 00:24:33is fine
- 00:24:34now we may do that in the next in a
- 00:24:37different tutorial but not in this
- 00:24:39tutorial we're good with classical DB
- 00:24:41and the rest
- 00:24:42but let's focus on the broker which is
- 00:24:45the most important thing here and there
- 00:24:46we can see the topics and the production
- 00:24:48and consumption so if you go in here we
- 00:24:51have the UI the broker the topics the
- 00:24:54connect we don't have anything there and
- 00:24:56if you do cut the Brokers these are the
- 00:24:57this is what has been produced
- 00:25:00in the last
- 00:25:02in the last few seconds which is 24
- 00:25:05kilobytes and 24 bytes per second and
- 00:25:08the rest so this is uh and the Zookeeper
- 00:25:10is connected you can see the Zookeeper
- 00:25:12is connected
- 00:25:14um self balancing is on yeah and the
- 00:25:16rest is fine so we only have we have a
- 00:25:18good broker name in here the Bison bites
- 00:25:21out and the rest and these are the other
- 00:25:23properties so what we really uh want is
- 00:25:26this topic
- 00:25:28as of this moment we don't have any
- 00:25:30topics created yet so we don't need to
- 00:25:33create anything we we do that
- 00:25:35automatically by the time we start
- 00:25:36publishing data into it so let's
- 00:25:39continue so that's the first uh this the
- 00:25:41second part anyways so if you connect to
- 00:25:44on our Kafka stream
- 00:25:46if you go back to a Kafka stream at this
- 00:25:48point
- 00:25:49which is what we have so we need to
- 00:25:51connect to the Kafka queue so we can see
- 00:25:54we just maybe publish one or two data
- 00:25:57into the Kafka queue and we'll see if it
- 00:25:59is working or not
- 00:26:00so we need to install a package for that
- 00:26:02which is going to be peep install Kafka
- 00:26:05python by the time you get this data
- 00:26:08from
- 00:26:10the random usage of me API we format the
- 00:26:13data right which is here and now we are
- 00:26:16dumping the data so we don't need to
- 00:26:18dump it for now we just uh go ahead and
- 00:26:22publish this data so we come in here
- 00:26:25I'm going to import Kafka I mean from
- 00:26:29Kafka
- 00:26:32producer
- 00:26:37uh and I think I need to import the time
- 00:26:39because what we want to do is we want to
- 00:26:42be producing to how to set the number of
- 00:26:44time then we start producing
- 00:26:47so uh let's see
- 00:26:50uh I just want to send a single data to
- 00:26:52the producer so let's initialize the
- 00:26:54producer at this point so we have a
- 00:26:56producer to be Kafka
- 00:26:58producer the bootstrap server
- 00:27:02bootstrap
- 00:27:05servers
- 00:27:07it's going to be we are connecting to
- 00:27:09the broker
- 00:27:11on 2902 however because we are not yet
- 00:27:15on the on the docker containers of this
- 00:27:18time we need to use the uh the external
- 00:27:21IP address which is the localhost and
- 00:27:25Report 9092
- 00:27:28uh I think we should set the max timeout
- 00:27:32the max block
- 00:27:35Ms which is the timeout it's going to be
- 00:27:3855 seconds
- 00:27:40all right
- 00:27:43okay so let's publish and push the data
- 00:27:46that we get
- 00:27:47from this guy we push it to the queue so
- 00:27:50we have producer
- 00:27:53dot send
- 00:27:55we have
- 00:27:57users
- 00:27:59created
- 00:28:00and we do a json.doms we just dump the
- 00:28:04data and then code it in UTF
- 00:28:07utf-8
- 00:28:10encoding
- 00:28:13ah not data Express yeah
- 00:28:16okay I think this should get us uh
- 00:28:20data to the queue let's see
- 00:28:23I'm going to do a python Kafka stream
- 00:28:26dot Pi so let's see if it is able to
- 00:28:28push data to the queue are we able to
- 00:28:30connect
- 00:28:31DQ it says let's see
- 00:28:36connecting to this guy
- 00:28:38proving this connecting connection
- 00:28:40complete so we are able to connect and
- 00:28:43push data so we come back into the car
- 00:28:46into the topic and we refresh
- 00:28:57excellent so we have a users created
- 00:29:00and we have data on the keyboard right
- 00:29:02now because we're not listening to this
- 00:29:03we can't see anything so if we run this
- 00:29:06again
- 00:29:10if I run this again we should see new
- 00:29:12messages coming in
- 00:29:17let's see
- 00:29:20good so this is the data that is coming
- 00:29:23on task Creek guy the first name is
- 00:29:26Wyatt Willis on task Creek Australia and
- 00:29:30the rest
- 00:29:31if you look at this guy
- 00:29:34uh we didn't print it out anyways but
- 00:29:37that's the data that is coming in so we
- 00:29:39are able to push the data to the queue
- 00:29:41and that's a good win for us so the next
- 00:29:43thing for is to sell so we have we have
- 00:29:47the
- 00:29:49we have the Zookeeper connected to Kafka
- 00:29:52successfully we're able to use the
- 00:29:55schema registry Avro schema to visualize
- 00:29:58the data on the topic which is fine so
- 00:30:00this part of the system is good now we
- 00:30:04need to push data from
- 00:30:07airflow to postgres I mean from airflow
- 00:30:11to Kafka not postgres so what we need to
- 00:30:14do now is initialize our workflow our
- 00:30:16airflow with progress server so we just
- 00:30:20come back in here to reduce the the
- 00:30:22workload
- 00:30:25ah so we come back to our python okay
- 00:30:30in here we converted Docker compose just
- 00:30:34minimize this for now
- 00:30:36uh right now we need to add our
- 00:30:41we need to add our web server
- 00:31:00and that's it so the final thing we need
- 00:31:03to do before we spin this up is to add
- 00:31:05our scripts for the entry point so we
- 00:31:08need to have a directory called
- 00:31:10a script
- 00:31:12in our script we're going to have entry
- 00:31:14points
- 00:31:15dot sh and in this entry point.sh what
- 00:31:19we want to do is we want to write the
- 00:31:20sequence of command that airflow should
- 00:31:24follow when it's trying to initialize
- 00:31:26the web server or the scheduler itself
- 00:31:29so we need to have a Bim Bash
- 00:31:48this is our entry point.sh so we need to
- 00:31:50add that to our startup volume which is
- 00:31:54going to be inside the script
- 00:31:56entry point dot sh
- 00:32:01to be synchronized into opt F flow
- 00:32:07is it Scripts
- 00:32:09entry point
- 00:32:12dot sh and the entry point file is going
- 00:32:16to be pointing to
- 00:32:18opt
- 00:32:20airflow
- 00:32:21script entry point
- 00:32:24it's not script see script entry point
- 00:32:28sh yeah
- 00:32:33and uh
- 00:32:34because we are connecting to postgres we
- 00:32:37need to set up our postgres because
- 00:32:40right now what we are doing basically is
- 00:32:43to uh alikemi is alchemy
- 00:32:46yeah
- 00:32:48Skill Academy
- 00:32:52and I think this is usually
- 00:32:55double underscore airflow the one that's
- 00:32:58called that yeah
- 00:33:00what am I missing uh sequential yeah I
- 00:33:04think this should be fine and the last
- 00:33:07part is going to be the
- 00:33:09is going to be the postgres which is uh
- 00:33:12where is it
- 00:33:14yeah
- 00:33:16yeah I need to just initialize the
- 00:33:18postgres which is uh postgres
- 00:33:25so what is on the same network
- 00:33:27one other thing that is left is the
- 00:33:30scheduler which is not initialized right
- 00:33:33now so what we need to do is to write a
- 00:33:36simple script for this scheduler we are
- 00:33:39going to have a scheduler
- 00:33:51so let's see if there's any error so we
- 00:33:54can quickly debug and continue our
- 00:33:56coding
- 00:34:03the web server is running and let's see
- 00:34:05if it is working as expected
- 00:34:08okay good uh it loaded the the
- 00:34:12requirement requirements txt and it's
- 00:34:14running it at this point which is good I
- 00:34:17think this this shows that uh
- 00:34:19environment is properly set up
- 00:34:22and the data is running as a specter
- 00:34:24good
- 00:34:32so uh web server is running at 8080
- 00:34:35because the web server is running I
- 00:34:38think what we said in our Docker compose
- 00:34:42was to be checking every every 30
- 00:34:45seconds
- 00:34:47yeah he's going to be checking every 30
- 00:34:49seconds so even though it's ready
- 00:34:52and we just need to wait for like 30
- 00:34:53seconds before this guy picks up
- 00:34:56and while that is while that is running
- 00:34:59I think we can go into our localhost
- 00:35:02and go into 1880
- 00:35:05and see if our airflow is properly set
- 00:35:08up so airflow is running and we have a
- 00:35:11admin which is our admin admin
- 00:35:14the sign in
- 00:35:16we should be able to so yes good the
- 00:35:19airflow is uh I mean is running as
- 00:35:22expected but there is no scheduler
- 00:35:25even though we are using a sequential
- 00:35:27executor which is not advisable for
- 00:35:30production because it's going to be
- 00:35:31running your task one after the other if
- 00:35:33you want to run tasks in parallel and uh
- 00:35:36in a more scalable way
- 00:35:38uh if you don't use this you should use
- 00:35:40something like a salary execute or
- 00:35:42something like that
- 00:35:44so what is happening
- 00:35:49websitis on LV just
- 00:35:52fix this
- 00:35:55and the scheduler should be up
- 00:36:00right the scheduler is also up
- 00:36:03and it's uh started so it's doing the
- 00:36:06same installation because we need to
- 00:36:08install airflow on the scheduler too so
- 00:36:12let's see if it is
- 00:36:14also going to be
- 00:36:16running as expected so while this is uh
- 00:36:18the installation is going on so we don't
- 00:36:20waste too much time
- 00:36:21we just proceed to our Kafka stream and
- 00:36:25we we fine-tune this and we can set up
- 00:36:27our dag so we can start seeing some
- 00:36:29stuff and movement on the UI so uh come
- 00:36:32back in here
- 00:36:33and minimize this
- 00:36:35uh so I have this guy and I just
- 00:36:38uncomment this
- 00:36:41I I don't need the stream data I know
- 00:36:44it's working I can see some data pushed
- 00:36:46to the queue and this is my data at this
- 00:36:50point
- 00:36:51yeah
- 00:36:53however we need to fine-tune this now
- 00:36:55that this guy will be running on the on
- 00:36:57the docker instance so we need to change
- 00:36:59this these are broker and then it's
- 00:37:02going to be uh broke at 2902.
- 00:37:05to use the internal IP address
- 00:37:10hmm
- 00:37:12I'm going to take a quick pause for the
- 00:37:14scheduler to be done and then I'll
- 00:37:16resume once it is done
- 00:37:20now that the schedule is has done is
- 00:37:23done initializing I think the next thing
- 00:37:25for us to do
- 00:37:27is to check uh the UI at this point
- 00:37:31so let's see if the UI is going to be
- 00:37:33updated so we just need to refresh
- 00:37:38now the error the warning message is
- 00:37:41gone and we can see our user automation
- 00:37:43doc
- 00:37:45that's a good one so now that we have
- 00:37:47our user automation Dag you can see
- 00:37:51if the the grid is initialized properly
- 00:37:58good so we have stream data from API
- 00:38:01which is good so the next thing we want
- 00:38:03to do is we want to switch the um we
- 00:38:07want to make sure that we are able to
- 00:38:09stream data directly from
- 00:38:11the random user API directly into Kafka
- 00:38:15as much as possible so instead of just
- 00:38:19one producing that we're doing we want
- 00:38:21to be producing all the data that are
- 00:38:24going to be sent from random user into
- 00:38:27Kafka directly so we just need to modify
- 00:38:29this particular
- 00:38:31this particular script this function
- 00:38:34will be updated
- 00:38:36so you have to get data from our data
- 00:38:38we're sending data to Kafka at this
- 00:38:40point
- 00:38:42now so instead of just this part we need
- 00:38:45to remove the Kafka producer maybe oh
- 00:38:50in here
- 00:38:58and then we just do oh well
- 00:39:02here
- 00:39:03so we have we've imported time
- 00:39:06so what we want to do is we want to
- 00:39:09finish that part which is if time time
- 00:39:16we need to get the current Style
- 00:39:21so we just uh this this every time
- 00:39:27so what we're going to do is we want to
- 00:39:29stream for like two minutes or even five
- 00:39:33minutes or six one minute I think we're
- 00:39:35going to be producing rapidly so what we
- 00:39:37want to do is we want to get the current
- 00:39:38time
- 00:39:43we just break this Loop otherwise
- 00:39:47if it is not
- 00:39:50better than one minute what you want to
- 00:39:52do is once you get the data
- 00:39:55from here so we want to move this guy
- 00:40:00from here to this place
- 00:40:04so we want to get the data
- 00:40:06format it and then
- 00:40:09maybe we just
- 00:40:11producer
- 00:40:13we already have a script down here
- 00:40:16so we just move this so
- 00:40:20so what we are doing here is while the
- 00:40:23time is between one minute so while we
- 00:40:26are producing within uh
- 00:40:28from 0 to 60 Seconds
- 00:40:30what I wanted is we want to as many
- 00:40:32times as possible send requests to
- 00:40:35random user.me API get the data format
- 00:40:38that data and send it to me
- 00:40:40to the queue itself so we just have a
- 00:40:44exception sorry accept uh exception
- 00:40:50as e
- 00:40:51and then we just log that we just import
- 00:40:56login
- 00:40:59so we just uh log that part just log in
- 00:41:04if I can only spell login
- 00:41:07and arrow chord
- 00:41:10and the error
- 00:41:12is
- 00:41:21the error is e
- 00:41:25okay
- 00:41:27and what we want to do is we want to
- 00:41:30continue so even if maybe there is a
- 00:41:32network downtime for like a two or two
- 00:41:34seconds or 30 seconds it doesn't matter
- 00:41:37we just logged that particular arrow and
- 00:41:40continue the loop and once it is one
- 00:41:42minute we break
- 00:41:43that's what we want to do so that's uh
- 00:41:46that's that with this uh adjustment at
- 00:41:49this point so what we want to do is we
- 00:41:51want to go back to the UI
- 00:41:53refresh and Trigger the dag from the UI
- 00:41:58yeah it's done so what we can do is we
- 00:42:01trigger this we just turn it on
- 00:42:09yeah okay so it's not triggering
- 00:42:11automatically because we didn't enable
- 00:42:13catch up so I just turn it on I just
- 00:42:16triggered it and there's just three I
- 00:42:19use automation should start any moments
- 00:42:22now do we have anything in the logs I
- 00:42:25don't think so so but let's let's follow
- 00:42:28the production on the
- 00:42:31the Kafka you are I'm going to just
- 00:42:33refresh this page
- 00:42:37okay so we are listening for new events
- 00:42:40on the on the UI
- 00:42:43just waiting for this to get triggered
- 00:42:46okay so it's running now
- 00:42:52once you start seeing data drop good so
- 00:42:55data is dropping on the UI and it's
- 00:42:57going to keep dropping for at least one
- 00:43:00minute
- 00:43:01we can trigger the
- 00:43:05the list from here
- 00:43:07and you can see the the offset number
- 00:43:11and the data that are coming in anyways
- 00:43:13good so while this is going on so that
- 00:43:16means uh at this point all our data is
- 00:43:20fine so we want to write the
- 00:43:24if you will check our UI so we've taken
- 00:43:27care of this part
- 00:43:28and we've taken care of the architecture
- 00:43:30so we want to set up the spark
- 00:43:32architecture and the Cassandra which are
- 00:43:35the other parts that is left that are
- 00:43:37left so we have the Masterwork
- 00:43:39architecture uh for this uh for the
- 00:43:43purpose of this session I will just use
- 00:43:45one uh Walker and I'll show you how to
- 00:43:48add the other workers if you want to add
- 00:43:50them
- 00:43:51and then we'll go from there
- 00:43:54we just go back to our Docker compost
- 00:43:57in here I'll just copy paste the
- 00:44:00Masterwork architecture and I'll just
- 00:44:02talk through them
- 00:44:04so here is the master worker
- 00:44:06architecture and
- 00:44:08what I'll do is I'll just quickly talk
- 00:44:10through them so this spark Master is
- 00:44:13coming from the sparkly test so the
- 00:44:15command to start this master is to go
- 00:44:18into the bin spark class and deploy the
- 00:44:20master so it's going to expose this on
- 00:44:23the part of 1990 and there's going to
- 00:44:25have another Port of 1777 so this is
- 00:44:28where the master will be communicate
- 00:44:30with the workers themselves so if you
- 00:44:32have multiple workers they'll be
- 00:44:34communicating on 70 77 70 77 with the
- 00:44:38master while if you want to access the
- 00:44:40you I will use a port 1990
- 00:44:44of course the they have to be on the
- 00:44:46same network so they can work correctly
- 00:44:48for the worker so if you have multiple
- 00:44:51worker we just replicate this okay we
- 00:44:55replicate this as one and then we call
- 00:44:57let me show you
- 00:44:59so if I copy this
- 00:45:02I'll just change the name
- 00:45:04from
- 00:45:07spark worker I'll just add maybe spark
- 00:45:10Walker 2
- 00:45:12somewhere something like this and they
- 00:45:15have the same uh
- 00:45:17configuration but the dependency that it
- 00:45:21they have against the master is going to
- 00:45:24be the same same spot class but instead
- 00:45:26of the master uh it's going to be
- 00:45:29instead of the master here all of them
- 00:45:31are going to have workers so if you have
- 00:45:33more than one worker you can just um
- 00:45:36have a the same class for them but
- 00:45:39communicating with the master on the
- 00:45:41same port uh the the same container name
- 00:45:44and the port so yeah that's how to add
- 00:45:47multiple workers but for the sake of the
- 00:45:50speed I'm going to just leave it at one
- 00:45:53and also these are the environment
- 00:45:56variable for the um
- 00:45:58for the worker so we need to have two
- 00:46:00calls and a minimum of one gig so if you
- 00:46:03reduce this one gig to maybe something
- 00:46:04like 500 you are going to have an error
- 00:46:08usually you have a minimum of one gig
- 00:46:10for the workout because they are doing
- 00:46:12the most of the work and
- 00:46:13yeah so I tested this with something
- 00:46:16less than one gig and I was having an
- 00:46:18error so it's I think it's best to have
- 00:46:19a minimum of one gig so you can also
- 00:46:21check the documentation for more
- 00:46:23information on that yeah now for the
- 00:46:25Cassandra DB we have a we're going to be
- 00:46:28using the latest the Cassandra so the
- 00:46:31container name and the hostname are the
- 00:46:32same we're exposing that on 94.2 the
- 00:46:36maximum if size is 512 and the rest so
- 00:46:39the username and password is Cassandra
- 00:46:40Cassandra
- 00:46:42so this is going to create amount of
- 00:46:44volume locally called Cassandra because
- 00:46:47of the host name it's going to mount it
- 00:46:49and this if you want to get any data
- 00:46:51into Cassandra you can do that through
- 00:46:53that point it's not necessary we don't
- 00:46:55need any volume really
- 00:46:58and that's it they have to be running on
- 00:47:00the same network so if we do a Docker
- 00:47:03compose up
- 00:47:04now we should have
- 00:47:06uh
- 00:47:08composed of detach we should have
- 00:47:12he says
- 00:47:14must be a strings postgres Network
- 00:47:17must be a string let's see services
- 00:47:20postgres
- 00:47:22Services yeah we don't need this
- 00:47:26just added when I mistakenly press the
- 00:47:28entire the other time
- 00:47:33all right so Cassandra is running
- 00:47:37and if you look at this we have a
- 00:47:41if you do Docker Pierce
- 00:47:45we have
- 00:47:48we have the Zookeeper which schema
- 00:47:50registry
- 00:47:51Let's see we have the scheduler yeah we
- 00:47:54have the spark Walker which is started
- 00:47:57and we have Cassandra the spark master
- 00:48:00yeah so the spark master and the worker
- 00:48:03I hope now 20 data engineering
- 00:48:07I'm going to create a new file
- 00:48:14I'll call it
- 00:48:17spark stream
- 00:48:19Dot py
- 00:48:22all right
- 00:48:25but if spark stream.py this is where
- 00:48:29we're going to be writing our
- 00:48:31our code for for spark streaming now we
- 00:48:36have all the other dependencies working
- 00:48:38so what we will need to do is we want we
- 00:48:40need to install the Cassandra
- 00:48:42driver
- 00:48:43so um Pi spark as well so we do a peep
- 00:48:51click install
- 00:48:53Cassandra driver
- 00:48:54so we can communicate with Cassandra
- 00:48:58and then we just need to do with our
- 00:49:01installation we need to do is this spark
- 00:49:03and Pi Spark
- 00:49:05so we need to install past spark and Pi
- 00:49:08spark uh dependencies
- 00:49:24wow Pi spark and Spark are installing
- 00:49:27I'm going to continue coding while we
- 00:49:29wait for the installation to be done
- 00:49:37all right so now that all of the
- 00:49:39dependencies are done are done I think
- 00:49:42we can continue SQL so we are importing
- 00:49:45spark session
- 00:49:48and we from this same package we are
- 00:49:50going to be importing some functions
- 00:49:53we're going to be importing
- 00:49:55are from Json type color so unless we
- 00:50:00start with those ones for now so what we
- 00:50:02want to do is we want to create a key
- 00:50:06space on Cassandra before we do anything
- 00:50:10because the key space is like a schema
- 00:50:12if for people that knows uh SQL postgres
- 00:50:17and the rest just like your public you
- 00:50:19need to have a key space and then in
- 00:50:20each key space you have multiple tables
- 00:50:23all right so we are doing a create key
- 00:50:25space
- 00:50:27is going to be done with the session
- 00:50:29I'll just put a pause in here so we we
- 00:50:32create a key space here but we need to
- 00:50:36create a connection so the dev
- 00:50:38create a table
- 00:50:40with a session so we are going to be
- 00:50:42creating a table here
- 00:50:46and then we need to uh insert the data
- 00:50:49that we we are fetching from Kafka
- 00:50:51inside data
- 00:50:54so we are going to be getting the
- 00:50:55session of course and then
- 00:50:57quarks which is a
- 00:51:00then we do the insertion here
- 00:51:04and then we establish we need to
- 00:51:06establish connection with the uh Kafka
- 00:51:09spark and Cassandra so we need to uh uh
- 00:51:13so let's say create
- 00:51:17spark connection here
- 00:51:20and then we are going to be creating
- 00:51:24spark connection
- 00:51:28create uh Cassandra
- 00:51:31connection
- 00:51:33okay
- 00:51:36and then we'll be creating
- 00:51:39Cassandra
- 00:51:41connection all right so let's start
- 00:51:44implementing this and then we work our
- 00:51:46way up so once we have a connection to
- 00:51:48Cassandra
- 00:51:49from there
- 00:51:51we create a connection to spark and then
- 00:51:54we create the key space the table and
- 00:51:58the rest so
- 00:51:59let's start with the main function so if
- 00:52:01the name
- 00:52:03equals to mean
- 00:52:08all right
- 00:52:11so we are going to have a spark
- 00:52:13connection which is going to be uh
- 00:52:16create spark connection we're going to
- 00:52:18write the implementation shortly and
- 00:52:21then in the create spark connection so
- 00:52:23what we want to do is we want to create
- 00:52:25an we want to establish connection with
- 00:52:26the spark which is going to be a try
- 00:52:30we're going to have a
- 00:52:32let's say screen
- 00:52:34it's going to be spark session
- 00:52:37so we build we we connect the Builder
- 00:52:40and then instead of just get get or
- 00:52:43create
- 00:52:44I think we're still going to need this a
- 00:52:46little bit and then we're going to have
- 00:52:48the application name
- 00:52:51so I'll say spark data streaming
- 00:52:56and
- 00:53:00you can have two dots in here can we
- 00:53:03uh we just have the config this is where
- 00:53:07things gets very interesting so with
- 00:53:10spark we need to specify the Java file
- 00:53:12that will be used so I'm going to be
- 00:53:15using some packages in here
- 00:53:18so two jars will be used to one it will
- 00:53:21be used to create connection with spark
- 00:53:23the other one with Cassandra
- 00:53:26and Kafka another so we have
- 00:53:30com.data data stacks
- 00:53:33dot Spark
- 00:53:34and the spark Cassandra
- 00:53:38connector however if you want to get
- 00:53:43access to them I think we can come into
- 00:53:45Maven Repository
- 00:53:47and we can search for a spark
- 00:53:50this is moving Repository
- 00:53:55maybe repository we can have a Cassandra
- 00:54:01connector okay
- 00:54:04I think we need to have spark Cassandra
- 00:54:07connector
- 00:54:09so from Com data Stacks we have a
- 00:54:12connector from here so if you don't have
- 00:54:14it I think you might need to download so
- 00:54:17the latest one is 3.41
- 00:54:20which I think is what we'll be using
- 00:54:23so it's just the latest release so what
- 00:54:26we need to do is we need to reference
- 00:54:28this correctly we have spark data
- 00:54:31connector we need to get the scalar
- 00:54:33version and then we we get the the
- 00:54:36version that we'll be using
- 00:54:38all right so going back to the code we
- 00:54:42are going to connect with the uh spark
- 00:54:46Cassandra connector
- 00:54:48uh the 2.13
- 00:54:52version 3.41
- 00:54:55and the other part which is going to be
- 00:54:58oh
- 00:54:59the other one which is going to be the
- 00:55:01SQL Kafka
- 00:55:03SQL Kafka connector
- 00:55:08secure Kaka
- 00:55:10yeah for this one
- 00:55:12we have a
- 00:55:14this is for structure swim streaming
- 00:55:16okay so we have a 3.41 2.132 so if you
- 00:55:21look at this guy
- 00:55:23we have access to it and you can
- 00:55:24download the Java file here include it
- 00:55:26in your price Parker uh Repository
- 00:55:31in the list of just there and you should
- 00:55:33have access to this so
- 00:55:35um
- 00:55:36this is the module which is a
- 00:55:39spark SQL kafka010213
- 00:55:43all right so going back to the code here
- 00:55:46I'm going to have a bug dot Apache
- 00:55:49dot spark and I'm going to have spark
- 00:55:53SQL Kafka
- 00:55:55zero one zero underscore 2.13
- 00:56:003.41 okay
- 00:56:022.13
- 00:56:050 1 0. okay 2.13341
- 00:56:11yeah so this is the those are these are
- 00:56:15the two packages will be uh two Java
- 00:56:17files will be needed okay
- 00:56:19so the next one
- 00:56:22is uh we just need to configure the
- 00:56:26the host name for Cassandra which is
- 00:56:28going to be Spark
- 00:56:30dot Cassandra
- 00:56:32theconnection.host
- 00:56:36and we are going to be putting that as
- 00:56:38localhost
- 00:56:41all right
- 00:56:44and
- 00:56:46and that's it this should establish a
- 00:56:48connection for us because we are going
- 00:56:50to be running this on Docker so this is
- 00:56:52going to be a broker
- 00:56:54but if you want to test this locally we
- 00:56:57do a localhost for now
- 00:57:01all right
- 00:57:02so this should give us access to
- 00:57:06to spark all right and then we just do
- 00:57:09accept
- 00:57:11exception as a e
- 00:57:14we just log the arrow
- 00:57:17we couldn't uh
- 00:57:21because I just add
- 00:57:25couldn't create
- 00:57:27the spark connection
- 00:57:30maybe session due to the exception
- 00:57:36okay and just
- 00:57:39pass the
- 00:57:41the errors argument there so with this
- 00:57:44we should be able to establish a
- 00:57:46connection
- 00:57:49all right and at this point we just do
- 00:57:54uh s con
- 00:57:57dot spark context that set log level to
- 00:58:01be I think
- 00:58:03I'll just do error
- 00:58:06plug in the info Spark
- 00:58:09connection created successfully
- 00:58:14foreign
- 00:58:23so basically this is going to create a
- 00:58:26spark connection for us
- 00:58:29so again to go to what we've just done
- 00:58:32we are creating a spark session with
- 00:58:36this name
- 00:58:37all right and then we are using these
- 00:58:40Java packages
- 00:58:41in this uh connection which is going to
- 00:58:44be the 2.13 for this black Cassandra
- 00:58:47connector and the other part is going to
- 00:58:49be spark SQL Kafka so this creates a
- 00:58:53connection for us on the
- 00:58:58yeah okay
- 00:59:02because to none
- 00:59:08so this creates a connection for us so
- 00:59:11the next thing we want to do
- 00:59:13is we want to establish connection to
- 00:59:18to Cassandra so now
- 00:59:21in here where we are establishing once
- 00:59:23we've established a spark connection the
- 00:59:26next thing for us is to establish a
- 00:59:28connection to Cassandra so we do that
- 00:59:32with cluster so we just do cluster
- 00:59:37I think we can have a cluster in here we
- 00:59:40just which has been imported
- 00:59:42uh we connect to localhost Cluster
- 00:59:47all right
- 00:59:49and this is going to be the uh
- 00:59:52yeah connecting to the cluster
- 00:59:57to Cassandra cluster all right
- 01:00:02we're going to have a session
- 01:00:05being created from the cluster we have
- 01:00:07cluster.connect so this is going to
- 01:00:09generate a session for us
- 01:00:11and with this session we can return
- 01:00:14session
- 01:00:16uh I think we should do a try catch here
- 01:00:22accept
- 01:00:23exception as e
- 01:00:26and log in the arrow
- 01:00:31could not create
- 01:00:35Cassandra
- 01:00:36connection
- 01:00:39due to
- 01:00:44e
- 01:00:45yeah
- 01:00:46I'm going to
- 01:00:48do the session here
- 01:00:51question on
- 01:00:53and yeah so once we do that
- 01:00:57if our session if sparkcon
- 01:01:00is not known then we continue okay
- 01:01:05then
- 01:01:06if that is known
- 01:01:08I think if it is not known then we
- 01:01:10establish a connection to uh Cassandra
- 01:01:13connection
- 01:01:14at this point so we just do uh this is
- 01:01:17going to be uh
- 01:01:19uh session
- 01:01:23okay
- 01:01:24I'm going to move this up here
- 01:01:32I just don't know
- 01:01:34okay
- 01:01:40all right uh
- 01:01:43this is shadowing the name outside uh
- 01:01:47just call this uh Cassandra session
- 01:01:53okay so if we have a connection we
- 01:01:55return it when if not we return this so
- 01:01:59so if
- 01:02:01session is not
- 01:02:04it's not I think we just break the
- 01:02:06connection
- 01:02:13we just return and there's no need to
- 01:02:15continue at this point
- 01:02:18I think we just do nothing uh
- 01:02:23oh instead of that we just do if it is
- 01:02:26not known so we only enter that if this
- 01:02:29is not known so what we want to do is we
- 01:02:31want to create a key space okay with the
- 01:02:33session that we have
- 01:02:35and then we want to create a table with
- 01:02:38our session
- 01:02:41for the key space we just come in here
- 01:02:43we do a session dot execute
- 01:02:55so even if we run this multiple times
- 01:02:58because we added this command if it if
- 01:03:01it does not exist it only runs once
- 01:03:04so we do the same for table
- 01:03:24now to insert the data into
- 01:03:27uh into the queue into Cassandra
- 01:03:31we are going to extract the
- 01:03:35data from the quarks
- 01:03:57all right so this gives us access to uh
- 01:04:01the data from there and then we can try
- 01:04:04to insert it now
- 01:04:06which is going to be session dot execute
- 01:04:30then
- 01:04:32the last bit is uh
- 01:04:35the only thing we need to do is we need
- 01:04:37to connect to Kafka
- 01:04:39so we can extract those information from
- 01:04:42Kafka so the last Parts I think is going
- 01:04:45to be uh
- 01:04:47depth
- 01:04:51I think we need to connect to
- 01:04:56to Kafka
- 01:05:00so we we are using the spark connection
- 01:05:03so we are going to have a spark DF
- 01:05:27so the spark basically what we're doing
- 01:05:30here is we are connecting to this pack
- 01:05:33uh by using spark connection to read
- 01:05:36data from Kafka on this uh on this
- 01:05:40server
- 01:05:41with uh the users created topic and
- 01:05:44we're starting from the beginning load
- 01:05:46it and we are returning that data frame
- 01:05:48from there so I think that's what we're
- 01:05:52doing at that point
- 01:05:54uh let's see what's the
- 01:05:58yeah of course I think it's just the
- 01:06:01this person
- 01:06:06yeah I think that's the last person
- 01:06:08issues
- 01:06:11um
- 01:06:13yeah I think that's pretty much it so
- 01:06:15when we create the Cassandra connection
- 01:06:17yeah
- 01:06:19uh yeah that's all
- 01:06:21so
- 01:06:23I think we can test this out
- 01:06:29uh once we connected Kafka so we need to
- 01:06:32get into that frame
- 01:06:33from there
- 01:06:35so when we connected to spark
- 01:06:39here
- 01:06:41we need to get the data frame
- 01:06:44so I'm going to call this uh DF
- 01:06:48and connect to Kafka which is spark
- 01:06:50connection
- 01:06:52so connect to
- 01:06:56Kafka with spark connection
- 01:07:00this is where we are creating
- 01:07:03create spark connection
- 01:07:07all right and then
- 01:07:10this is uh pretty much descriptive
- 01:07:12and we create a key space click the
- 01:07:15table
- 01:07:16so instead of inserting directly I think
- 01:07:19we need to write do a stream so we're
- 01:07:21not doing it just once we are doing it
- 01:07:23uh in stream so we just have a streaming
- 01:07:26query
- 01:07:27at this point which is going to be
- 01:07:30I I think we need to select the data
- 01:07:36and do selection DF it's going to be a
- 01:07:40I think
- 01:07:42you need to have a function
- 01:07:45because once we have this data frame we
- 01:07:47need to structure it in a way to insert
- 01:07:50into Cassandra okay and the way to do
- 01:07:53that is to have a struct fields which is
- 01:07:57going to be create a selection
- 01:08:01data frame from Kafka
- 01:08:06okay and then we have a spark data frame
- 01:08:10so this is going to we need the schema
- 01:08:13at this point it's going to have a extra
- 01:08:17type
- 01:08:20construct type
- 01:08:46so what we're doing basically is to
- 01:08:48create a schema so when we read data
- 01:08:50from the spark data frame we are
- 01:08:52converting whatever we read into this
- 01:08:54particular schema and then we are
- 01:08:56selecting data from it we are selecting
- 01:08:58everything from that column so if you
- 01:09:00have let's say hundred one thousand one
- 01:09:03million records whatever
- 01:09:05it's going to select all of them from
- 01:09:07the Kafka queue right once it selects
- 01:09:09them even if it is just one record
- 01:09:12it selects them and format it in this
- 01:09:14particular schema and select the data
- 01:09:16from it to be inserted so we just need
- 01:09:19to do a selection
- 01:09:22data frame which is going to be uh
- 01:09:25create selection data frame from Kafka
- 01:09:28and then we're using the spark
- 01:09:30data frame which is the DF that we're
- 01:09:34getting from here
- 01:09:40yeah we're using that
- 01:09:42and uh
- 01:09:44yeah so we just need to get our
- 01:09:46selection data frame and do a right
- 01:09:49stream from that
- 01:09:52in the format of where we are writing to
- 01:09:55bug dot Apache Dot spark.sql.cassandra
- 01:10:02option to check we need to have a
- 01:10:05checkpoint location this is not
- 01:10:07necessary this is not important uh it's
- 01:10:11not compulsory but it's just to have a
- 01:10:14checkpoint in case of failure
- 01:10:16and I'll just have a better option
- 01:10:21it can have a key space
- 01:10:26option key space is going to be
- 01:10:29spark strings
- 01:10:35because you just put everything in it
- 01:10:38yeah so we don't have to have
- 01:10:41just have option
- 01:10:43the table is gonna be
- 01:10:46created users
- 01:10:48and the last thing we need to do is
- 01:10:50start this streaming yeah
- 01:10:54and that's all we need to do
- 01:10:58um so
- 01:11:02we can just do streaming but uh weights
- 01:11:05termination
- 01:11:07there are some examples here
- 01:11:10uh this one by two spark and you can see
- 01:11:14everything that I wrote in there is
- 01:11:16somewhere here so if you have multiple
- 01:11:18Brokers you do something like that yeah
- 01:11:20so everything is like that so you can
- 01:11:22just uh maybe come in here do a copy
- 01:11:25paste and all that but yeah that's this
- 01:11:27is where I got most of the code from and
- 01:11:30I'm just uh replicating that the on the
- 01:11:33code
- 01:11:34yeah so let's let's pin back our uh
- 01:11:38Docker containers
- 01:11:41and then detach them
- 01:11:43[Music]
- 01:11:46I think we we have an arrow somewhere
- 01:11:49here
- 01:11:51hello
- 01:11:52yeah
- 01:11:53it's a coma
- 01:12:00in your arrows
- 01:12:02can I see one of that red
- 01:12:06guys just uh
- 01:12:08this is fine this is a public uh
- 01:12:11the key space name so it's fine I
- 01:12:13couldn't recognize it so it's okay
- 01:12:17yeah I think while we wait for that to
- 01:12:20come up and that's all we need to do
- 01:12:30right now that the installation is done
- 01:12:32our web server is back up and running
- 01:12:35and uh I should lies back it's also up
- 01:12:39and running let's see Chevrolet is also
- 01:12:42listening
- 01:12:43uh also we you need to see why our
- 01:12:46control center has gone down
- 01:12:49I think we just need to start it up
- 01:12:50there's really no need
- 01:12:52uh we don't need the control center at
- 01:12:54this point because we know that our data
- 01:12:56will always be produced and we don't
- 01:12:58need to visualize them anymore
- 01:13:00even we don't need the schema registry
- 01:13:03anymore because that's connected to the
- 01:13:05control center so we only need the
- 01:13:09the the Kafka which is uh our broker to
- 01:13:13go
- 01:13:13and uh that we're able to connect to it
- 01:13:16so
- 01:13:18let's see if our web server
- 01:13:20is up and running
- 01:13:24I guess it is we just do admin admin to
- 01:13:27sign in
- 01:13:31and uh we just go in here
- 01:13:36to see the
- 01:13:37dog
- 01:13:39and we have it stream data from API good
- 01:13:42so what we need to do
- 01:13:45is we
- 01:13:47we need to
- 01:13:50check our Cassandra Eva Cassandra is
- 01:13:53open running but we need to do a run of
- 01:13:57this spark stream to see if we able to
- 01:13:59even connect to
- 01:14:01to spark stream at all
- 01:14:06so it's going to run at the entry points
- 01:14:09yeah
- 01:14:14and try to establish a connection
- 01:14:23okay so I think with this dependencies
- 01:14:26because I don't have them before
- 01:14:29these two guys uh I think it should
- 01:14:33download them
- 01:14:35so he says the artifact does not exist
- 01:14:38on risk of dependencies you couldn't see
- 01:14:40Cassandra connector 213
- 01:14:4414 let's see Cassandra connector so
- 01:14:48let's go into moving
- 01:14:50Maven repository spark Cassandra
- 01:14:52connector
- 01:14:54two one three three four one oh it's
- 01:14:583.4.1
- 01:15:00yeah
- 01:15:02because that's it uh we just need to
- 01:15:04rerun this
- 01:15:14and I think
- 01:15:17yeah with this it's a
- 01:15:20let's see if it connects to it
- 01:15:25this is the server error mismatched in
- 01:15:29Beauty and the file
- 01:15:30so we need to establishing a connection
- 01:15:34uh trying to execute
- 01:15:38yeah I didn't close these uh
- 01:15:41I didn't close that uh
- 01:15:45this uh
- 01:15:46this bracket
- 01:15:49I think did I do the same for the key
- 01:15:52space
- 01:15:53yeah the key space is fine it's only the
- 01:15:55table so I need to just rerun that
- 01:16:10all right so we're able to create a key
- 01:16:12space
- 01:16:14let's see we're able to create a key
- 01:16:15space I'm able to create a key uh a
- 01:16:18table
- 01:16:20so the only thing that is left is that
- 01:16:22uh
- 01:16:23is uh it says uh an error call while
- 01:16:28publishing this to do to data
- 01:16:31stacks.login so
- 01:16:33that there are two ways to this so we
- 01:16:36need to we need to it was starting the
- 01:16:38the stream so we need to go into uh
- 01:16:43because we are using this uh this uh jav
- 01:16:47files
- 01:16:49in here we need to add a dependency if
- 01:16:52you look at our VMV
- 01:16:54you know I've been I think you know I
- 01:16:57think in the lips
- 01:16:59in the leaves and you check a buy spark
- 01:17:02we need to download those uh
- 01:17:06price you need to download those
- 01:17:08Java files in there if you look at the
- 01:17:11jars in here
- 01:17:13we need to find uh
- 01:17:15spark SQL
- 01:17:18and we we can't find them here
- 01:17:21okay so we need to add them to this I
- 01:17:25think I already did the download so I'm
- 01:17:27going to copy paste them
- 01:17:30and while I do that we can do uh we can
- 01:17:34go into the Cassandra and uh
- 01:17:37check so we we go into the Cassandra and
- 01:17:40then we have the
- 01:17:42interactive terminal with the use this
- 01:17:45is determinant uh this is the container
- 01:17:49name
- 01:17:49now we're going into SQL sh with the
- 01:17:51username of Cassandra password of
- 01:17:53Cassandra the localhost is uh is the
- 01:17:57iPad is the IP and then this is the port
- 01:17:59so if you exit if you go into that we
- 01:18:03have access to the SQL sh and we can do
- 01:18:06a describe
- 01:18:11sparkstreams.created users
- 01:18:14and we have
- 01:18:16decoded users outside the uuid and the
- 01:18:19rest and these are the details of the
- 01:18:22data that we have so if we do a select
- 01:18:25stuff from spark streams created users
- 01:18:28we have an ID address and the rest of
- 01:18:31the data which is good so we have access
- 01:18:33to to Cassandra data directly so all we
- 01:18:38need to do now is by the time we are
- 01:18:40streaming data from uh Kafka we need to
- 01:18:45push this data to Cassandra so the only
- 01:18:47dependencies that we we need to add
- 01:18:50are those Java files which you need to
- 01:18:52download for now I will just exit and do
- 01:18:55a Docker compost down to remove all
- 01:18:57these existing images
- 01:18:59then we continue from there already copy
- 01:19:01the Java files so what I'm going to do
- 01:19:03I'm going to copy paste steps into the
- 01:19:06Jazz directory so we can have a
- 01:19:08dependencies on them
- 01:19:10it's asking me to click OK yeah I just
- 01:19:13copy them so the two Java files have
- 01:19:16been copied and you can see them there
- 01:19:18one is the sparkassandra connector and
- 01:19:21then the other one is pack SQL Kafka
- 01:19:24connector
- 01:19:36what we need to do now is start up the
- 01:19:38docker compose
- 01:19:40now we do Docker compose of detached
- 01:19:43we can start up the data card compose
- 01:19:46the containers are starting up and you
- 01:19:49can see that uh our web server is having
- 01:19:51an error what's the arrow I suppose that
- 01:19:54might be the entry point but let's see
- 01:19:57uh web server if you click on that to
- 01:20:01scroll to the bottom and yeah I
- 01:20:04suspected uh command no found M so if we
- 01:20:07go back to entry point uh we have a an M
- 01:20:10when I'm trying to install upgrade the
- 01:20:12PIP now just delete that and try again I
- 01:20:16guess
- 01:20:17it's going to start the web server
- 01:20:21and then we can check again I just clear
- 01:20:23this
- 01:20:24yeah okay I think it's starting up
- 01:20:27so if you go outside of here
- 01:20:30we need to stop the control center and
- 01:20:35uh the schema registry
- 01:20:38so once we stop the schema registry the
- 01:20:41control center is not going to be coming
- 01:20:42up because of the dependence
- 01:20:44anyways so the next thing we need to do
- 01:20:46is we need to go to the UI and check our
- 01:20:50Spark
- 01:20:51the status of our spark session so going
- 01:20:55back to
- 01:20:56the UI you go to localhost 1990 just
- 01:21:00refresh this
- 01:21:02yeah and you can see our spark Master
- 01:21:05the IP address which is the localhost
- 01:21:07really is just the IP address of the
- 01:21:09docker content itself that is running it
- 01:21:11so we have the worker ID you have
- 01:21:13there's no running application no
- 01:21:15completed application and you can see
- 01:21:18that so we go back and then we submit a
- 01:21:20job our first job to that but before we
- 01:21:22do that I think we need to comment out
- 01:21:24the streaming part of the the spark
- 01:21:27system
- 01:21:28where we're streaming into Cassandra for
- 01:21:30now I'll just commit these two out
- 01:21:35yeah
- 01:21:36then we can try we want to see whether
- 01:21:38the key space on the table will be
- 01:21:40automatically created
- 01:21:54if we do a Refresh on the UI yeah okay
- 01:21:57so we can see the ID
- 01:21:59the ID is uh and the name is practical
- 01:22:03streaming it's been running for 14
- 01:22:04seconds
- 01:22:05so if you click on the the application
- 01:22:08you can see the worker that is running
- 01:22:09it and the worker ID the number of calls
- 01:22:13the memory and the rest
- 01:22:15so going back to the time now we can see
- 01:22:17the key space successfully created and
- 01:22:19table created successfully but is the
- 01:22:23connection yeah the connection it says
- 01:22:26it's saying an error what's going on
- 01:22:29let's look into the docker container
- 01:22:32uh so I think we just need to do Docker
- 01:22:36exec it to see what is going on
- 01:22:39and I think the script is fine I'll just
- 01:22:42do Docker exec and check it
- 01:22:47and I'll do it select star from there
- 01:22:53is Spark
- 01:22:55yeah I think it's fine we just exceed
- 01:22:57that uh we'll resubmit the job and check
- 01:23:01again if it is fine or not
- 01:23:09okay
- 01:23:10I think it's coming back up
- 01:23:18yeah let's check can we see any error it
- 01:23:20says successfully started the service
- 01:23:23connecting to master
- 01:23:26yeah I think I think you successfully
- 01:23:28connected and if you look at the data
- 01:23:31frame that we created which is the
- 01:23:32structure into spark which is fine and
- 01:23:35Cassandra is fine the data frame is okay
- 01:23:38and uh yeah I think we are good
- 01:23:43really
- 01:23:44right
- 01:23:46so the next thing we need to do is we
- 01:23:50need to
- 01:23:52submit a job and enable the streaming to
- 01:23:55Cassandra
- 01:23:57so if we refresh this check the UI and
- 01:24:00refresh
- 01:24:02I just delete this and go to the 1990
- 01:24:06refresh this part
- 01:24:08I think we need to log in because we
- 01:24:10already spinning down from right okay we
- 01:24:13don't
- 01:24:14I'll click on that okay let's start it
- 01:24:21trigger dog
- 01:24:24so it's coming out
- 01:24:27now that the key space and the table has
- 01:24:30been created we can comment this part
- 01:24:33and resubmit the job so I added a login
- 01:24:37info so we can see when the streaming is
- 01:24:39being started and you can see the key
- 01:24:41space and the table has been created
- 01:24:43added this part
- 01:24:45and then so what we're going to be doing
- 01:24:47is we're going to be submitting this job
- 01:24:50back to a budget Spark
- 01:24:55then
- 01:24:57simultaneously we're going to trigger
- 01:24:59the dark so we can start seeing the data
- 01:25:02that has been produced the cap to Kafka
- 01:25:04queue and this is the last one that was
- 01:25:07running so I'm going to retrigger this
- 01:25:09so we have a second run and once that is
- 01:25:12done we we check we do a double check on
- 01:25:16the on Cassandra to see if we are
- 01:25:18getting streams of data into Cassandra
- 01:25:20well right now
- 01:25:22right now we need to get the data into
- 01:25:25Kafka first
- 01:25:27by triggering this dag which is
- 01:25:29currently running
- 01:25:30then we go from there into into
- 01:25:34Cassandra
- 01:25:38I'm going to open a new terminal
- 01:25:42well we doing
- 01:25:45an execution into the terminal of
- 01:25:48Cassandra so I'm going to do a Docker
- 01:25:50exec Dash it into Cassandra and I'm
- 01:25:54going to be running the SQL statement
- 01:25:58that we're going to be using so this
- 01:26:02the producing is done
- 01:26:04when we trigger The Jug so the next
- 01:26:06thing we need to do is check uh I'll
- 01:26:08just do select star from there select
- 01:26:12stuff from
- 01:26:14that would be either spark streams
- 01:26:17dot uh
- 01:26:19created users
- 01:26:22yeah and you can see we have initially
- 01:26:2559 rules
- 01:26:27if we go in there and do a select star
- 01:26:30from
- 01:26:31I'm going to just select the first name
- 01:26:33and the last name so we can I think it's
- 01:26:36a little bit more so I'll just do select
- 01:26:39first name
- 01:26:40and last name from
- 01:26:43from spark stream to created users
- 01:26:48and you can see the username and
- 01:26:49password uh the first name and the last
- 01:26:52name of the users that have been
- 01:26:53streamed into Cassandra so basically
- 01:26:57that's how to do it and if we trigger
- 01:27:01more we see more records and uh yeah
- 01:27:05that kind of caps you though
- 01:27:07basically I think we've satisfied all
- 01:27:09the items on our architecture we have
- 01:27:11the API and from API to Cassandra we
- 01:27:15started with the Apache airflow which is
- 01:27:17coming pulling the data from the API and
- 01:27:21once we trigger that process it streams
- 01:27:23data into Kafka then to Apache spark
- 01:27:26then into Cassandra and there you have
- 01:27:29it folks and end-to-end data engineering
- 01:27:32pipeline from data ingestion to
- 01:27:35processing and storage all containerized
- 01:27:38with Docker find this video useful don't
- 01:27:41forget to give it a thumbs up and
- 01:27:43subscribe for more content like this
- 01:27:44until next time happy engineering
- Data Engineering
- Apache Airflow
- Kafka
- Cassandra
- Docker
- PostgreSQL
- Zookeeper
- Spark
- Random User API
- Containerization