Realtime Data Streaming | End To End Data Engineering Project

01:27:47
https://www.youtube.com/watch?v=GqAcTrqKcrY

Summary

TLDRO video explora un proxecto de enxeñaría de datos que utiliza múltiples tecnoloxías avanzadas, incluíndo Apache Airflow, Zookeeper, Kafka, Cassandra e PostgreSQL, todo containerizado con Docker. Comeza cunha API de xeración de usuario aleatorio que fornece datos falsos que logo son extraídos por Airflow e procesados nun fluxo de traballo que se move a través de Kafka. A arquitectura do sistema engloba un DAG en Airflow para recuperación intermitente de datos, os cales son enviados a unha cola de Kafka xestionada por Zookeeper. Os datos son visualizados nun Control Center e organizados por un Schema Registry. Apache Spark manexa o procesamento de datos con Cassandra actuando como destino de almacenamento final. Todo está executado en contedores Docker, promovendo a eficiencia na xestión e despregado de servizos interdependentes. O tutorial abarca desde configurar e executar os contedores ata a implementación e conexión coas distintas bases de datos, demostrando un fluxo completo desde a adquisición de datos (data ingestion) ata o seu almacenamento final.

Takeaways

  • 🔧 Proxecto de enxeñaría de datos completo.
  • 📡 Uso de tecnoloxías avanzadas como Apache Airflow e Kafka.
  • 🐳 Sistema totalmente containerizado con Docker.
  • 📊 Visualización de datos a través de Kafka Control Center.
  • ⚙️ Apache Spark para procesamento de datos.
  • 🔑 Schema Registry para xestión de esquemas Avro.
  • 🎛️ Apache Zookeeper para xestión de brokers en Kafka.
  • 🔄 Interacción e integración fluída entre múltiples sistemas.
  • 📋 Uso de Python para scripts e operativa.
  • 🔒 Almacenamento de datos procesados en Cassandra.

Timeline

  • 00:00:00 - 00:05:00

    O vídeo comeza co presentador a saudar os espectadores e a introducilos no tema do vídeo, que está dirixido a persoas interesadas en enxeñaría de datos. Promete mostrar un proxecto de enxeñaría de datos de extremo a extremo, utilizando tecnoloxías como Apache airflow, zookeeper, Kafka, Cassandra e PostgreSQL, todas containerizadas con Docker.

  • 00:05:00 - 00:10:00

    O presentador mostra unha visión xeral da arquitectura do sistema. Empeza coa descrición do API xerador de usuarios random. Este API xera datos ficticios sobre persoas que se usan dentro do sistema para obter información mediante un DAG en Apache Airflow de forma intermitente e enviar os datos a Apache Kafka.

  • 00:10:00 - 00:15:00

    Avanza na explicación do fluxo de datos que van desde Apache Kafka usando Apache Zookeeper para o manexo de brokers múltiples, ata Apache Spark para o procesamento dos datos. A xestión dos contedores faise a través de Docker. Conclúe a visión xeral do sistema explicando como se visualizan os datos no Control Center e a importancia do Schema Registry para interpretar os datos en Kafka.

  • 00:15:00 - 00:20:00

    Comeza a implementación da parte da obtención de datos. O presentador demostra como configurar un proxecto en Python para traballar coa API Random User. Ensina aos espectadores como obter datos do API, formatealos e visualizalos no terminal. Utiliza Python operador de Airflow para cartear tarefas dentro da súa configuración por defecto.

  • 00:20:00 - 00:25:00

    A implementación da arquitectura avanza coa creación do fluxo de traballo en Apache Airflow. O presentador crea arquivos necesarios e configure o operador Python para manexar o fluxo de datos desde o API. Asegúrese de estabelecer parámetros básicos como a hora de comezo e quen posúe o traballo ou DAG.

  • 00:25:00 - 00:30:00

    O presentador demostra como enviar datos ao Apache Kafka desde o DAG creado en Apache Airflow. Instala as dependencias necesarias e configura un productor de Kafka para enviar os datos a un tópico chamado 'users-created'.

  • 00:30:00 - 00:35:00

    Conclúe a sección de configuración de Kafka introducindo configuracións dentro do ficheiro Docker-Compose para levantar contornos necesarios como Apache Zookeeper, brokers Kafka e servicios relacionados, asegurándose que todos poden comunicarse correctamente.

  • 00:35:00 - 00:40:00

    Corrección de erros na configuración de Docker-Compose proporcionada para asegurar a conexión axeitada entre todos os contedores incluíndo Apache Airflow e os servicios de Kafka.

  • 00:40:00 - 00:45:00

    O presentador pon a proba o sistema realizando operacións para confirmar que Apache Airflow e Apache Kafka funcionan correctamente, mediante a visualización de mensaxes no Control Center.

  • 00:45:00 - 00:50:00

    Comeza a implementar o uso de Apache Spark, asegurando previamente que todos os compoñentes de Apache Kafka e contornos Airflow están a funcionar correctamente. Continúa co código de Spark, dirixido a procesar datos de Kafka usando a arquitectura máster-traballador.

  • 00:50:00 - 00:55:00

    Explica a configuración dos containers Spark dentro de Docker-Compose, incluíndo mestre e traballadores. Indica cómo configurar os traballadores adicionais se se desexa.

  • 00:55:00 - 01:00:00

    Progresión á ferramenta de almacenamento con Apache Cassandra. Configura Docker-Compose para levantar unha instancia de Cassandra para recibir os datos procesados dende Apache Spark.

  • 01:00:00 - 01:05:00

    Detalla a creación do fluxo de traballo completo, incluíndo a configuración para que Apache Spark poida obter datos de Kafka e realizar tarefas de transformación antes de insertalos en Apache Cassandra.

  • 01:05:00 - 01:10:00

    Ofrece solución a erros no proceso descrito, especialmente no manexo de conexións e integración entre Apache Spark e Cassandra, asegurando que as conexións están corréctamente estabelecidas e as datos recibidas.

  • 01:10:00 - 01:15:00

    Descrición do proceso de finalización, comprobando a inserción exitosa de datos procesados en Apache Cassandra, e conclusión do pipeline de enxeñaría de datos de extremo a extremo.

  • 01:15:00 - 01:20:00

    Conclúe o vídeo revisando todo o pipeline construído, asegurando que cada compoñente está integrado e funcionando de maneira efectiva. Anima aos espectadores a unirse para máis contidos similares, subliñando a utilidade do proxecto mostrado.

  • 01:20:00 - 01:27:47

    Finaliza o vídeo agradecendo aos espectadores por seguir o tutorial e anima a compartir o vídeo e subscribirse para futuros contidos sobre enxeñaría de datos e proxectos similares.

Show more

Mind Map

Video Q&A

  • Que API se utiliza para xerar datos de usuario aleatorios?

    Utilízase a API Random User Generator para xerar datos de usuario aleatorios.

  • Cal é o propósito de Apache Zookeeper neste proxecto?

    Zookeeper actúa como xestor para Apache Kafka, xestionando múltiples brokers en Kafka.

  • Para que se usa Apache Airflow?

    Apache Airflow utilízase para orquestrar a extracción de datos desde a API e o seu procesamento.

  • Como se visualizan os datos en Kafka?

    Os datos en Kafka visualízanse a través dun Control Center, que ofrece unha interface para ver os temas e mensaxes en Kafka.

  • Que fai a Schema Registry?

    Schema Registry proporciona unha interface para almacenar e recuperar esquemas Avro, o que axuda durante a visualización dos fluxos de datos de Kafka.

  • Que rol ten Apache Spark no sistema?

    Apache Spark utilízase para procesar os datos obtidos de Kafka e despois envialos a Cassandra.

  • Como está containerizado o sistema?

    Todo o sistema está containerizado utilizando Docker, facilitando a súa posta en marcha e xestión.

  • Cales son as linguaxes principais utilizadas no desenvolvemento deste proxecto?

    As linguaxes principais son Python para a implementación de scripts e configuracións, con Docker para a containerización.

  • Que bases de datos se utilizan neste proxecto?

    Úsanse PostgreSQL para almacenar datos de configuración e Cassandra para almacenar os datos procesados.

  • Cal é a función do Docker Compose neste proxecto?

    Docker Compose utilízase para definir e correr múltiples contedores Docker, facilitando a configuración e xestión do proxecto.

View more video summaries

Get instant access to free YouTube video summaries powered by AI!
Subtitles
en
Auto Scroll:
  • 00:00:00
    thank you
  • 00:00:00
    [Music]
  • 00:00:14
    hello and welcome back to the channel
  • 00:00:16
    today's video is for people that are
  • 00:00:19
    passionate about data engineering if you
  • 00:00:22
    are looking to build scalable and robust
  • 00:00:23
    system
  • 00:00:25
    then today's video is just for you
  • 00:00:27
    before we continue don't forget to
  • 00:00:29
    subscribe to the channel and ring the
  • 00:00:31
    notification Bell so you don't miss out
  • 00:00:33
    on any future video
  • 00:00:35
    in today's episode I'll be walking you
  • 00:00:37
    through an end-to-end data engineering
  • 00:00:39
    project that stitches together several
  • 00:00:41
    essential Technologies like Apache
  • 00:00:44
    airflow Apache zookeeper Kafka Cassandra
  • 00:00:47
    and postgres SQL all containerized with
  • 00:00:50
    Docker so let's Dive Right In and get
  • 00:00:53
    started let's take a look at the eye
  • 00:00:55
    level view of the system architecture we
  • 00:00:57
    start by having an API which is the
  • 00:00:59
    random user generator API this is used
  • 00:01:01
    for generating random user data
  • 00:01:04
    cells like lorem ipsum but this is
  • 00:01:07
    basically for people
  • 00:01:09
    all this data is not surreal and these
  • 00:01:13
    are not real people
  • 00:01:15
    this is how to use the API and this is
  • 00:01:17
    the results that we're expecting from
  • 00:01:19
    the API itself so going back to the
  • 00:01:21
    architecture we are going to have a dag
  • 00:01:23
    in Apache airflow which is fetching data
  • 00:01:26
    from this API intermittently
  • 00:01:29
    the data fetch is going to be streamed
  • 00:01:33
    into Kafka Q by the way Apache airflow
  • 00:01:36
    with our configuration is going to be
  • 00:01:38
    running on postgres SQL
  • 00:01:41
    now the data is that we get from this
  • 00:01:43
    API is going to be streamed into Kafka
  • 00:01:45
    which is sitting on Apache zookeeper
  • 00:01:49
    and the Zookeeper is the manager to
  • 00:01:51
    manage all the multiple Brokers that we
  • 00:01:53
    have on Kafka so if we have multiple
  • 00:01:55
    Brokers let's say we have five or three
  • 00:01:58
    forecasts for Kafka broadcast zookeeper
  • 00:02:01
    is going to be the manager so when one
  • 00:02:02
    goes down another one replaces the
  • 00:02:04
    dollar is restarts the process itself
  • 00:02:07
    then the data inside the Kafka broadcast
  • 00:02:10
    will be visualized in our control center
  • 00:02:13
    the control center serves as a UI where
  • 00:02:16
    we can see what is going on in our Kafka
  • 00:02:18
    broadcast the number of messages that
  • 00:02:20
    are coming into different topics and the
  • 00:02:22
    different topics on the Kafka Cube while
  • 00:02:24
    the schema registry it provides us a
  • 00:02:26
    seven layer for the metadata the schema
  • 00:02:29
    registry is a restful interface for
  • 00:02:31
    storing and retrieving average schema
  • 00:02:33
    which is particularly useful when the
  • 00:02:36
    Kafka streams uh is being visualized so
  • 00:02:40
    we can understand the schema of the
  • 00:02:41
    records all the data that is coming into
  • 00:02:43
    Kafka
  • 00:02:44
    so the data that we get from Kafka will
  • 00:02:47
    be streamed
  • 00:02:48
    with um Apache spot
  • 00:02:51
    we have a master worker architecture
  • 00:02:53
    being set up on Apache Spark when a job
  • 00:02:57
    is submitted to the master
  • 00:02:59
    the master decides which of the worker
  • 00:03:01
    takes up the job and run the data run
  • 00:03:05
    the task
  • 00:03:07
    once task is run the task in this case
  • 00:03:09
    will be to stream data into Cassandra so
  • 00:03:12
    we have a listener
  • 00:03:14
    that is going to be getting data from
  • 00:03:16
    Kafka into spark then streamed directly
  • 00:03:19
    into Cassandra
  • 00:03:21
    all this architecture are running on
  • 00:03:24
    Docker containers which you are going to
  • 00:03:26
    have a single Docker compose file
  • 00:03:29
    that helps us to spin up all these
  • 00:03:33
    architecture that's the basic
  • 00:03:35
    architecture we have right now and
  • 00:03:38
    we can dive right into
  • 00:03:40
    the first level getting data from the
  • 00:03:44
    random user API without a dag
  • 00:03:47
    I'm going to start by creating a new
  • 00:03:49
    project called Data engineering
  • 00:03:55
    I'm using python 3.11
  • 00:03:59
    so you can use any python I think the
  • 00:04:02
    minimum version
  • 00:04:03
    that may be supported should be 3.9 so
  • 00:04:06
    but in my case I'll be using 30.11
  • 00:04:13
    in this environment we have a main.py
  • 00:04:17
    which we can we're going to be using
  • 00:04:19
    this stamina a lot so it's best to
  • 00:04:21
    ensure that our terminal is properly set
  • 00:04:23
    up I'm going to start by deactivating
  • 00:04:25
    the current Source then I'm going to
  • 00:04:26
    Source it again
  • 00:04:28
    I'm going to increase the size of the UI
  • 00:04:30
    so it's visible to everybody so I'm
  • 00:04:33
    going to have a
  • 00:04:34
    my editor I'm going to have a font I'm
  • 00:04:37
    going to increase it to 20.
  • 00:04:40
    okay
  • 00:04:42
    all right
  • 00:04:43
    with this if I do python main.py
  • 00:04:47
    I python good so that means the
  • 00:04:49
    environment is properly set up and if I
  • 00:04:52
    check which python
  • 00:04:54
    is in my let's see the right directory
  • 00:04:56
    good
  • 00:04:57
    all right I don't need to do something
  • 00:04:59
    py so I'm going to delete it
  • 00:05:03
    I'm going to create a new folder
  • 00:05:06
    I'll call the folder Doug
  • 00:05:10
    in because we've activated our BMV so we
  • 00:05:13
    need to install the base package that
  • 00:05:15
    we're going to be using keep install I'm
  • 00:05:17
    going to install Apache airflow
  • 00:05:21
    now this is going to initialize the
  • 00:05:22
    airflow so we're going to start from
  • 00:05:23
    this guy
  • 00:05:24
    connected to this API level and we're
  • 00:05:27
    going to spin up this instance in a
  • 00:05:29
    little bit
  • 00:05:43
    now that the installation is done I'm
  • 00:05:45
    going to be
  • 00:05:48
    I'm going to be creating a new file in
  • 00:05:50
    our dag and I'll call it Kafka stream
  • 00:05:57
    this file is going to be where we are
  • 00:05:59
    going to be running the the dark from so
  • 00:06:02
    I'm going to be importing some packages
  • 00:06:03
    from airflow and some internal packages
  • 00:06:06
    so from date time
  • 00:06:09
    import date time
  • 00:06:11
    then from airflow I'm going to be
  • 00:06:14
    importing dark
  • 00:06:17
    all right
  • 00:06:19
    the other thing that I'll be importing
  • 00:06:21
    from airflow is the python operator that
  • 00:06:23
    I'll be using
  • 00:06:24
    to fetch this data operators.python
  • 00:06:29
    import python operator
  • 00:06:33
    now I need a default argument which is
  • 00:06:35
    going to be my default ax
  • 00:06:37
    that I'll be using to
  • 00:06:40
    attach to my dag itself so to know who
  • 00:06:43
    owns the project and some properties
  • 00:06:45
    that I'll be using so the owner is going
  • 00:06:48
    to be
  • 00:06:49
    going to just do a scholar
  • 00:06:52
    then it starts date
  • 00:06:55
    is going to be
  • 00:06:56
    the start date is going to be date time
  • 00:07:02
    that I just imported and that would be
  • 00:07:05
    2023 2023
  • 00:07:09
    uh
  • 00:07:11
    nine
  • 00:07:13
    three and uh I could just use 10 o'clock
  • 00:07:17
    it doesn't really matter what time it
  • 00:07:19
    says
  • 00:07:20
    all right so this is uh 2023
  • 00:07:23
    the August 39 September 3 and then 10
  • 00:07:29
    o'clock
  • 00:07:31
    all right now that I have my default AG
  • 00:07:34
    so I'm going to create my dag which is
  • 00:07:37
    going to be where uh where it's going to
  • 00:07:41
    serve as as an entry point so I'm going
  • 00:07:43
    to have with dag which is my dag I'm
  • 00:07:47
    going to call the task ID
  • 00:07:49
    uh the ID is going to be user automation
  • 00:07:54
    I'll call it user automation
  • 00:07:56
    all right and my default ads default ads
  • 00:08:00
    is going to be
  • 00:08:02
    default ads the scheduled interfer is
  • 00:08:05
    going to be
  • 00:08:07
    daily uh call this art daily
  • 00:08:12
    and there is no need for catch up so
  • 00:08:14
    I'll just put catch up
  • 00:08:16
    close so this is going to be running as
  • 00:08:18
    our dag I'm going to minimize this for
  • 00:08:20
    now
  • 00:08:22
    then
  • 00:08:24
    we are going to have a python operator
  • 00:08:27
    I'm going to call this streaming task
  • 00:08:30
    okay it's going to be a python operator
  • 00:08:32
    and
  • 00:08:33
    I'll call the task ID
  • 00:08:36
    stream
  • 00:08:38
    data from
  • 00:08:40
    API
  • 00:08:42
    okay
  • 00:08:44
    then with python operator we need the
  • 00:08:46
    python callable which is going to be the
  • 00:08:49
    function that we're going to be calling
  • 00:08:50
    so I'll call this stream data function
  • 00:08:54
    so we don't have it so we need to create
  • 00:08:57
    a function so I'll call this the Stream
  • 00:09:02
    data
  • 00:09:03
    I'm going to be importing Json because
  • 00:09:06
    this is going to be what I'll be
  • 00:09:08
    formatting the response like us so what
  • 00:09:12
    I need to do is I I need to import
  • 00:09:14
    request
  • 00:09:15
    to get the data from uh request to get
  • 00:09:20
    your data from the API itself so I'm
  • 00:09:22
    going to have
  • 00:09:23
    a request
  • 00:09:26
    .get
  • 00:09:27
    I'm going to go into the API
  • 00:09:30
    in here
  • 00:09:32
    I'll just copy this URL
  • 00:09:36
    I'll paste it in here as my API there's
  • 00:09:39
    no other parameters to be set so it's
  • 00:09:41
    fine so what I'll do is I'll go back in
  • 00:09:44
    here my request is going to be uh saved
  • 00:09:47
    in response so I'm going to have
  • 00:09:50
    press.json so I'm going to have stream
  • 00:09:53
    data at this point so we can test run
  • 00:09:56
    this before running the
  • 00:09:58
    the dug itself so rest.json I'm going to
  • 00:10:02
    print in this
  • 00:10:05
    okay
  • 00:10:07
    so if I run this
  • 00:10:09
    by the time you know
  • 00:10:11
    I'm going to clear this up
  • 00:10:13
    and I do python docs and I call this
  • 00:10:16
    Kafka
  • 00:10:18
    string
  • 00:10:21
    so it should get me the data
  • 00:10:24
    from the API
  • 00:10:27
    uh yeah I'm going to comment this out
  • 00:10:33
    oh no
  • 00:10:35
    yeah I'll just comment that
  • 00:10:38
    okay and
  • 00:10:41
    I'll run this again
  • 00:10:43
    okay good so I have access to the
  • 00:10:46
    results from the API and as expected we
  • 00:10:49
    have the result which is coming in an
  • 00:10:51
    array
  • 00:10:52
    of data and the first record at this
  • 00:10:55
    point is this guy
  • 00:10:58
    the second part
  • 00:11:00
    is the info so we only need access to
  • 00:11:03
    this
  • 00:11:04
    to the results and this is results and
  • 00:11:06
    info we don't need this info for now we
  • 00:11:09
    need this result and we only need the
  • 00:11:11
    access to the first record which is this
  • 00:11:13
    Json the large Json that we have so to
  • 00:11:17
    put that in context we are going to be
  • 00:11:20
    getting the response somewhere here
  • 00:11:24
    and
  • 00:11:25
    I'm going to put that press because to
  • 00:11:29
    your breasts I'm going to get the
  • 00:11:31
    results part of the data and I'll get
  • 00:11:34
    the first index which is this guy
  • 00:11:37
    from here the first index from that
  • 00:11:40
    so if I print this I'm going to put I'm
  • 00:11:43
    going to print this
  • 00:11:47
    okay and let's run this again
  • 00:11:51
    I have the gender and I think we can
  • 00:11:55
    still format this uh
  • 00:11:57
    nicely can't we I think we can do
  • 00:12:01
    json.toms we just dumped the J at the
  • 00:12:04
    response and then we indent it as a
  • 00:12:07
    maybe three
  • 00:12:09
    if we do that
  • 00:12:10
    we'll run this again
  • 00:12:13
    I should run this again we shall have
  • 00:12:15
    something pretty nicely laid out for us
  • 00:12:17
    so we have the gender which is the mail
  • 00:12:20
    the location email login ID and all that
  • 00:12:25
    which is good I think this is a good
  • 00:12:27
    step forward before we continue to
  • 00:12:29
    ensure that the each of the task is fine
  • 00:12:33
    all right so I have the stream data but
  • 00:12:36
    this is not exactly what I want so what
  • 00:12:38
    I want is I'm going to separate this a
  • 00:12:41
    little bit so I have get data
  • 00:12:44
    get data
  • 00:12:46
    so I'm going to move all this guy
  • 00:12:48
    into this function and I'm going to
  • 00:12:51
    return at this point I'm going to return
  • 00:12:54
    response because this is what I need I
  • 00:12:57
    just need to get the API data
  • 00:13:00
    format it in Json
  • 00:13:02
    that's the right I get the response in
  • 00:13:04
    Json format and I get the results and
  • 00:13:07
    the first index of that result that's
  • 00:13:08
    what I'm doing with the get data
  • 00:13:10
    function
  • 00:13:13
    meanwhile
  • 00:13:14
    if I come back in here
  • 00:13:17
    I need to put this in in a
  • 00:13:21
    in a specific format okay because that's
  • 00:13:25
    what I want to put on my Kafka queue
  • 00:13:27
    right I need to have a format data where
  • 00:13:30
    I'm going to be formatting this
  • 00:13:31
    particular data that is coming in so I'm
  • 00:13:33
    going to have format data
  • 00:13:35
    so I'm going to get the response that I
  • 00:13:38
    get from the API
  • 00:13:40
    into this function so I'm going to have
  • 00:13:42
    a single
  • 00:13:44
    I have a variable called function this
  • 00:13:47
    object this is where I'm going to be
  • 00:13:49
    holding all these data
  • 00:13:51
    okay yeah it's a flute
  • 00:13:55
    so I can put this somewhere here
  • 00:14:00
    I'll just uh
  • 00:14:02
    put it on the side
  • 00:14:05
    and I should be able to see all the data
  • 00:14:10
    and won't go like that
  • 00:14:12
    yeah
  • 00:14:13
    so if I have this
  • 00:14:15
    I have the gender as a Mill and so what
  • 00:14:19
    I need to do I need to extract this data
  • 00:14:21
    so what I what I want to do is get the
  • 00:14:23
    I'll start with the the uh the first
  • 00:14:26
    name so I'm going to have the data
  • 00:14:28
    first name
  • 00:14:30
    which is going to be coming from the
  • 00:14:33
    name Let's see we have the name which is
  • 00:14:36
    the first and the last name right so
  • 00:14:39
    what I want to do is I want to get a
  • 00:14:41
    name
  • 00:14:42
    and I want to get the first name
  • 00:15:15
    press because to format data and I'll
  • 00:15:18
    put in press
  • 00:15:20
    and let's see if I I have the right
  • 00:15:23
    results
  • 00:15:25
    if I run this again
  • 00:15:27
    I should have
  • 00:15:30
    yeah it says picture
  • 00:15:32
    yeah instead of date I should be rest of
  • 00:15:35
    course
  • 00:15:37
    that's a typo
  • 00:15:39
    now we have a more streamlined data
  • 00:15:42
    which is more of what we need we have
  • 00:15:45
    the first name for this person and if
  • 00:15:48
    you look at this data that we have
  • 00:15:51
    yeah
  • 00:15:53
    if you look at the data this is the
  • 00:15:55
    picture of the guy that we have which is
  • 00:15:58
    fine that's what we need
  • 00:16:01
    as of this moment so the next thing we
  • 00:16:04
    need to do is we need to add Kafka
  • 00:16:07
    teacher to this well before we do that
  • 00:16:09
    we need to set up our Kafka this is just
  • 00:16:12
    a
  • 00:16:13
    this is just a way for us to get access
  • 00:16:15
    to this API so the next thing we are
  • 00:16:18
    going to do
  • 00:16:20
    these are going to set up Apache airflow
  • 00:16:23
    on the docker container right now I just
  • 00:16:25
    import installed Apache airflow it has
  • 00:16:28
    not been doing anything on my system so
  • 00:16:30
    we need to set up a Docker compose file
  • 00:16:33
    that is going to be
  • 00:16:35
    initializing this Apache air flow and
  • 00:16:38
    Kafka the schema registry and the rest
  • 00:16:41
    okay
  • 00:16:43
    now so what we need to do is we need to
  • 00:16:46
    in our files here
  • 00:16:49
    I'll just put this back
  • 00:16:51
    where it's supposed to be and I'll just
  • 00:16:53
    minimize this so I'll just come in here
  • 00:16:56
    in our root directory I'll create a new
  • 00:16:58
    file I'll call it Docker file Docker
  • 00:17:01
    compose
  • 00:17:03
    because that's what we want we want to
  • 00:17:05
    we we we don't need a Docker file really
  • 00:17:08
    what we need is a Docker compose so we
  • 00:17:10
    can just spin up and spin down at any
  • 00:17:12
    point in time we want so which is fine
  • 00:17:15
    so I'm going to start with the Zookeeper
  • 00:17:17
    so we can just put in this architecture
  • 00:17:19
    pretty nicely so it looks like the
  • 00:17:22
    dependencies from Apaches zookeeper down
  • 00:17:25
    to the Kafka Kafka is Con connected to
  • 00:17:28
    the control center then the schema
  • 00:17:30
    registry and this guy is stand alone
  • 00:17:32
    because it's not connected to the
  • 00:17:34
    Confluence architecture we're going to
  • 00:17:36
    be using the golf plate systems here
  • 00:17:38
    then we're going to have a separate
  • 00:17:40
    system which is going to be discard the
  • 00:17:43
    Apache spark and a separate system for
  • 00:17:45
    Cassandra so so let's see how that is
  • 00:17:48
    going to happen
  • 00:18:03
    now this should prepare a zookeeper
  • 00:18:07
    ready for operation so this is basic
  • 00:18:10
    stuff and if you need to do some more
  • 00:18:15
    detailed configuration of the zoo keypad
  • 00:18:17
    so you maybe you have some special
  • 00:18:19
    parameters that you want to to look into
  • 00:18:22
    I think it may be best to consult the
  • 00:18:25
    documentation so on the different
  • 00:18:27
    environment variables and our best to
  • 00:18:29
    tweak this configurations but but for
  • 00:18:32
    now this should do for us
  • 00:18:33
    in our in our case so we're going to
  • 00:18:36
    have a broker
  • 00:18:37
    and I'll call the image name this is
  • 00:18:39
    going to come from Confluence Inc also
  • 00:18:42
    and this is going to be CP server so
  • 00:18:44
    this is going to be a broker
  • 00:18:46
    uh if you check the docker up you should
  • 00:18:49
    see so right now we just ticked off
  • 00:18:52
    Apache zookeeper so the next one we are
  • 00:18:55
    working on is this Kafka
  • 00:18:59
    the server
  • 00:19:40
    so that passes for uh
  • 00:19:43
    uh Brew guide so so we need to go now
  • 00:19:46
    for the schema registry
  • 00:20:10
    and uh
  • 00:20:12
    I think for the dependence the only
  • 00:20:14
    dependency we have left is the control
  • 00:20:16
    center
  • 00:20:17
    but technically the control center is
  • 00:20:20
    dependent on the schema registry because
  • 00:20:23
    the average schemas that this the
  • 00:20:26
    average schema that the schema registry
  • 00:20:29
    allows the Kafka to visualize
  • 00:20:32
    on the UI it's going to be it's going to
  • 00:20:35
    be a dependencies on the schema registry
  • 00:20:37
    so technically this control center is
  • 00:20:40
    listening for events on schema registry
  • 00:20:43
    to visualize the data directly on Kafka
  • 00:20:46
    which is managed by zookeeper so that's
  • 00:20:49
    the dependency Dynamics anyways so we
  • 00:20:52
    just need to add the control center
  • 00:21:09
    so we don't have any engines here but if
  • 00:21:12
    you look at the images I already have
  • 00:21:14
    some images
  • 00:21:15
    installed already I already downloaded
  • 00:21:17
    the
  • 00:21:19
    the images from
  • 00:21:23
    from the confluence
  • 00:21:27
    Docker help so all I need to do is do
  • 00:21:30
    Docker compose up
  • 00:21:33
    and I'll do in the dash mode
  • 00:21:37
    so this is going to create so for some
  • 00:21:39
    if this is your first time without these
  • 00:21:42
    images what it's going to do is going to
  • 00:21:45
    pull those images down to your system
  • 00:21:46
    but because I already done that so
  • 00:21:49
    that's why you're not seeing the pulling
  • 00:21:51
    so it's I already have the images and
  • 00:21:53
    it's going to just spin up the
  • 00:21:54
    containers from those images directly
  • 00:21:58
    so now this is what I was talking about
  • 00:22:00
    the other time Suzuki bikes is only when
  • 00:22:03
    it was LD that the broker started and
  • 00:22:06
    you can see these guys are still created
  • 00:22:08
    waiting for the broker to be held before
  • 00:22:10
    it continues if you look at the UI they
  • 00:22:12
    are not they are not started now that
  • 00:22:15
    this guy is uh is running is because the
  • 00:22:18
    broadcast says tit is ready to accept
  • 00:22:20
    connection
  • 00:22:22
    so yeah so that's the dependency level
  • 00:22:24
    until one is done before the other uh
  • 00:22:28
    continue so if there's any error at this
  • 00:22:30
    point
  • 00:22:31
    we we need to check
  • 00:22:33
    to be sure that the environment is ready
  • 00:22:36
    and ready to accept connection before we
  • 00:22:39
    continue
  • 00:22:40
    so we can fix any error
  • 00:22:42
    and uh
  • 00:22:44
    continue
  • 00:22:47
    so we have the schema registry I think
  • 00:22:50
    it's going to take the most time because
  • 00:22:52
    it's uh
  • 00:22:54
    I think it's 30 seconds because you know
  • 00:22:56
    yeah it's 30 seconds so it's going to be
  • 00:22:58
    checking every 30 seconds
  • 00:23:01
    and the timeout is 10 seconds maybe I
  • 00:23:03
    should have reduced that but right now
  • 00:23:05
    this uh
  • 00:23:07
    let's see
  • 00:23:13
    and these are warnings we should be fine
  • 00:23:18
    we're waiting for yeah the schema
  • 00:23:21
    registry is okay now and the control
  • 00:23:22
    center is coming up so which is okay uh
  • 00:23:26
    the control center also is started which
  • 00:23:28
    is fine I think at this point
  • 00:23:30
    uh we are done with this part we only
  • 00:23:34
    need to confirm on the UI that the
  • 00:23:37
    environment is up and running and that
  • 00:23:40
    is ready to accept connections so what
  • 00:23:42
    we need to do
  • 00:23:43
    if you just click on this guy or go to
  • 00:23:46
    localhost 9021
  • 00:23:49
    I think we are a little bit early so
  • 00:23:52
    let's check
  • 00:23:55
    yeah the control center is still
  • 00:23:56
    starting off
  • 00:24:01
    we just wait till it's done
  • 00:24:04
    he says started Network server and this
  • 00:24:07
    is ready
  • 00:24:08
    okay so if we refresh this page we
  • 00:24:11
    should be good
  • 00:24:14
    now this is our control center
  • 00:24:18
    so the importance of those connections
  • 00:24:19
    that we did uh is to be sure that we
  • 00:24:23
    have the right Brokers and we can
  • 00:24:25
    visualize the uh
  • 00:24:27
    the data right now we don't need any
  • 00:24:30
    connections to K SQL DB or connect which
  • 00:24:33
    is fine
  • 00:24:34
    now we may do that in the next in a
  • 00:24:37
    different tutorial but not in this
  • 00:24:39
    tutorial we're good with classical DB
  • 00:24:41
    and the rest
  • 00:24:42
    but let's focus on the broker which is
  • 00:24:45
    the most important thing here and there
  • 00:24:46
    we can see the topics and the production
  • 00:24:48
    and consumption so if you go in here we
  • 00:24:51
    have the UI the broker the topics the
  • 00:24:54
    connect we don't have anything there and
  • 00:24:56
    if you do cut the Brokers these are the
  • 00:24:57
    this is what has been produced
  • 00:25:00
    in the last
  • 00:25:02
    in the last few seconds which is 24
  • 00:25:05
    kilobytes and 24 bytes per second and
  • 00:25:08
    the rest so this is uh and the Zookeeper
  • 00:25:10
    is connected you can see the Zookeeper
  • 00:25:12
    is connected
  • 00:25:14
    um self balancing is on yeah and the
  • 00:25:16
    rest is fine so we only have we have a
  • 00:25:18
    good broker name in here the Bison bites
  • 00:25:21
    out and the rest and these are the other
  • 00:25:23
    properties so what we really uh want is
  • 00:25:26
    this topic
  • 00:25:28
    as of this moment we don't have any
  • 00:25:30
    topics created yet so we don't need to
  • 00:25:33
    create anything we we do that
  • 00:25:35
    automatically by the time we start
  • 00:25:36
    publishing data into it so let's
  • 00:25:39
    continue so that's the first uh this the
  • 00:25:41
    second part anyways so if you connect to
  • 00:25:44
    on our Kafka stream
  • 00:25:46
    if you go back to a Kafka stream at this
  • 00:25:48
    point
  • 00:25:49
    which is what we have so we need to
  • 00:25:51
    connect to the Kafka queue so we can see
  • 00:25:54
    we just maybe publish one or two data
  • 00:25:57
    into the Kafka queue and we'll see if it
  • 00:25:59
    is working or not
  • 00:26:00
    so we need to install a package for that
  • 00:26:02
    which is going to be peep install Kafka
  • 00:26:05
    python by the time you get this data
  • 00:26:08
    from
  • 00:26:10
    the random usage of me API we format the
  • 00:26:13
    data right which is here and now we are
  • 00:26:16
    dumping the data so we don't need to
  • 00:26:18
    dump it for now we just uh go ahead and
  • 00:26:22
    publish this data so we come in here
  • 00:26:25
    I'm going to import Kafka I mean from
  • 00:26:29
    Kafka
  • 00:26:32
    producer
  • 00:26:37
    uh and I think I need to import the time
  • 00:26:39
    because what we want to do is we want to
  • 00:26:42
    be producing to how to set the number of
  • 00:26:44
    time then we start producing
  • 00:26:47
    so uh let's see
  • 00:26:50
    uh I just want to send a single data to
  • 00:26:52
    the producer so let's initialize the
  • 00:26:54
    producer at this point so we have a
  • 00:26:56
    producer to be Kafka
  • 00:26:58
    producer the bootstrap server
  • 00:27:02
    bootstrap
  • 00:27:05
    servers
  • 00:27:07
    it's going to be we are connecting to
  • 00:27:09
    the broker
  • 00:27:11
    on 2902 however because we are not yet
  • 00:27:15
    on the on the docker containers of this
  • 00:27:18
    time we need to use the uh the external
  • 00:27:21
    IP address which is the localhost and
  • 00:27:25
    Report 9092
  • 00:27:28
    uh I think we should set the max timeout
  • 00:27:32
    the max block
  • 00:27:35
    Ms which is the timeout it's going to be
  • 00:27:38
    55 seconds
  • 00:27:40
    all right
  • 00:27:43
    okay so let's publish and push the data
  • 00:27:46
    that we get
  • 00:27:47
    from this guy we push it to the queue so
  • 00:27:50
    we have producer
  • 00:27:53
    dot send
  • 00:27:55
    we have
  • 00:27:57
    users
  • 00:27:59
    created
  • 00:28:00
    and we do a json.doms we just dump the
  • 00:28:04
    data and then code it in UTF
  • 00:28:07
    utf-8
  • 00:28:10
    encoding
  • 00:28:13
    ah not data Express yeah
  • 00:28:16
    okay I think this should get us uh
  • 00:28:20
    data to the queue let's see
  • 00:28:23
    I'm going to do a python Kafka stream
  • 00:28:26
    dot Pi so let's see if it is able to
  • 00:28:28
    push data to the queue are we able to
  • 00:28:30
    connect
  • 00:28:31
    DQ it says let's see
  • 00:28:36
    connecting to this guy
  • 00:28:38
    proving this connecting connection
  • 00:28:40
    complete so we are able to connect and
  • 00:28:43
    push data so we come back into the car
  • 00:28:46
    into the topic and we refresh
  • 00:28:57
    excellent so we have a users created
  • 00:29:00
    and we have data on the keyboard right
  • 00:29:02
    now because we're not listening to this
  • 00:29:03
    we can't see anything so if we run this
  • 00:29:06
    again
  • 00:29:10
    if I run this again we should see new
  • 00:29:12
    messages coming in
  • 00:29:17
    let's see
  • 00:29:20
    good so this is the data that is coming
  • 00:29:23
    on task Creek guy the first name is
  • 00:29:26
    Wyatt Willis on task Creek Australia and
  • 00:29:30
    the rest
  • 00:29:31
    if you look at this guy
  • 00:29:34
    uh we didn't print it out anyways but
  • 00:29:37
    that's the data that is coming in so we
  • 00:29:39
    are able to push the data to the queue
  • 00:29:41
    and that's a good win for us so the next
  • 00:29:43
    thing for is to sell so we have we have
  • 00:29:47
    the
  • 00:29:49
    we have the Zookeeper connected to Kafka
  • 00:29:52
    successfully we're able to use the
  • 00:29:55
    schema registry Avro schema to visualize
  • 00:29:58
    the data on the topic which is fine so
  • 00:30:00
    this part of the system is good now we
  • 00:30:04
    need to push data from
  • 00:30:07
    airflow to postgres I mean from airflow
  • 00:30:11
    to Kafka not postgres so what we need to
  • 00:30:14
    do now is initialize our workflow our
  • 00:30:16
    airflow with progress server so we just
  • 00:30:20
    come back in here to reduce the the
  • 00:30:22
    workload
  • 00:30:25
    ah so we come back to our python okay
  • 00:30:30
    in here we converted Docker compose just
  • 00:30:34
    minimize this for now
  • 00:30:36
    uh right now we need to add our
  • 00:30:41
    we need to add our web server
  • 00:31:00
    and that's it so the final thing we need
  • 00:31:03
    to do before we spin this up is to add
  • 00:31:05
    our scripts for the entry point so we
  • 00:31:08
    need to have a directory called
  • 00:31:10
    a script
  • 00:31:12
    in our script we're going to have entry
  • 00:31:14
    points
  • 00:31:15
    dot sh and in this entry point.sh what
  • 00:31:19
    we want to do is we want to write the
  • 00:31:20
    sequence of command that airflow should
  • 00:31:24
    follow when it's trying to initialize
  • 00:31:26
    the web server or the scheduler itself
  • 00:31:29
    so we need to have a Bim Bash
  • 00:31:48
    this is our entry point.sh so we need to
  • 00:31:50
    add that to our startup volume which is
  • 00:31:54
    going to be inside the script
  • 00:31:56
    entry point dot sh
  • 00:32:01
    to be synchronized into opt F flow
  • 00:32:07
    is it Scripts
  • 00:32:09
    entry point
  • 00:32:12
    dot sh and the entry point file is going
  • 00:32:16
    to be pointing to
  • 00:32:18
    opt
  • 00:32:20
    airflow
  • 00:32:21
    script entry point
  • 00:32:24
    it's not script see script entry point
  • 00:32:28
    sh yeah
  • 00:32:33
    and uh
  • 00:32:34
    because we are connecting to postgres we
  • 00:32:37
    need to set up our postgres because
  • 00:32:40
    right now what we are doing basically is
  • 00:32:43
    to uh alikemi is alchemy
  • 00:32:46
    yeah
  • 00:32:48
    Skill Academy
  • 00:32:52
    and I think this is usually
  • 00:32:55
    double underscore airflow the one that's
  • 00:32:58
    called that yeah
  • 00:33:00
    what am I missing uh sequential yeah I
  • 00:33:04
    think this should be fine and the last
  • 00:33:07
    part is going to be the
  • 00:33:09
    is going to be the postgres which is uh
  • 00:33:12
    where is it
  • 00:33:14
    yeah
  • 00:33:16
    yeah I need to just initialize the
  • 00:33:18
    postgres which is uh postgres
  • 00:33:25
    so what is on the same network
  • 00:33:27
    one other thing that is left is the
  • 00:33:30
    scheduler which is not initialized right
  • 00:33:33
    now so what we need to do is to write a
  • 00:33:36
    simple script for this scheduler we are
  • 00:33:39
    going to have a scheduler
  • 00:33:51
    so let's see if there's any error so we
  • 00:33:54
    can quickly debug and continue our
  • 00:33:56
    coding
  • 00:34:03
    the web server is running and let's see
  • 00:34:05
    if it is working as expected
  • 00:34:08
    okay good uh it loaded the the
  • 00:34:12
    requirement requirements txt and it's
  • 00:34:14
    running it at this point which is good I
  • 00:34:17
    think this this shows that uh
  • 00:34:19
    environment is properly set up
  • 00:34:22
    and the data is running as a specter
  • 00:34:24
    good
  • 00:34:32
    so uh web server is running at 8080
  • 00:34:35
    because the web server is running I
  • 00:34:38
    think what we said in our Docker compose
  • 00:34:42
    was to be checking every every 30
  • 00:34:45
    seconds
  • 00:34:47
    yeah he's going to be checking every 30
  • 00:34:49
    seconds so even though it's ready
  • 00:34:52
    and we just need to wait for like 30
  • 00:34:53
    seconds before this guy picks up
  • 00:34:56
    and while that is while that is running
  • 00:34:59
    I think we can go into our localhost
  • 00:35:02
    and go into 1880
  • 00:35:05
    and see if our airflow is properly set
  • 00:35:08
    up so airflow is running and we have a
  • 00:35:11
    admin which is our admin admin
  • 00:35:14
    the sign in
  • 00:35:16
    we should be able to so yes good the
  • 00:35:19
    airflow is uh I mean is running as
  • 00:35:22
    expected but there is no scheduler
  • 00:35:25
    even though we are using a sequential
  • 00:35:27
    executor which is not advisable for
  • 00:35:30
    production because it's going to be
  • 00:35:31
    running your task one after the other if
  • 00:35:33
    you want to run tasks in parallel and uh
  • 00:35:36
    in a more scalable way
  • 00:35:38
    uh if you don't use this you should use
  • 00:35:40
    something like a salary execute or
  • 00:35:42
    something like that
  • 00:35:44
    so what is happening
  • 00:35:49
    websitis on LV just
  • 00:35:52
    fix this
  • 00:35:55
    and the scheduler should be up
  • 00:36:00
    right the scheduler is also up
  • 00:36:03
    and it's uh started so it's doing the
  • 00:36:06
    same installation because we need to
  • 00:36:08
    install airflow on the scheduler too so
  • 00:36:12
    let's see if it is
  • 00:36:14
    also going to be
  • 00:36:16
    running as expected so while this is uh
  • 00:36:18
    the installation is going on so we don't
  • 00:36:20
    waste too much time
  • 00:36:21
    we just proceed to our Kafka stream and
  • 00:36:25
    we we fine-tune this and we can set up
  • 00:36:27
    our dag so we can start seeing some
  • 00:36:29
    stuff and movement on the UI so uh come
  • 00:36:32
    back in here
  • 00:36:33
    and minimize this
  • 00:36:35
    uh so I have this guy and I just
  • 00:36:38
    uncomment this
  • 00:36:41
    I I don't need the stream data I know
  • 00:36:44
    it's working I can see some data pushed
  • 00:36:46
    to the queue and this is my data at this
  • 00:36:50
    point
  • 00:36:51
    yeah
  • 00:36:53
    however we need to fine-tune this now
  • 00:36:55
    that this guy will be running on the on
  • 00:36:57
    the docker instance so we need to change
  • 00:36:59
    this these are broker and then it's
  • 00:37:02
    going to be uh broke at 2902.
  • 00:37:05
    to use the internal IP address
  • 00:37:10
    hmm
  • 00:37:12
    I'm going to take a quick pause for the
  • 00:37:14
    scheduler to be done and then I'll
  • 00:37:16
    resume once it is done
  • 00:37:20
    now that the schedule is has done is
  • 00:37:23
    done initializing I think the next thing
  • 00:37:25
    for us to do
  • 00:37:27
    is to check uh the UI at this point
  • 00:37:31
    so let's see if the UI is going to be
  • 00:37:33
    updated so we just need to refresh
  • 00:37:38
    now the error the warning message is
  • 00:37:41
    gone and we can see our user automation
  • 00:37:43
    doc
  • 00:37:45
    that's a good one so now that we have
  • 00:37:47
    our user automation Dag you can see
  • 00:37:51
    if the the grid is initialized properly
  • 00:37:58
    good so we have stream data from API
  • 00:38:01
    which is good so the next thing we want
  • 00:38:03
    to do is we want to switch the um we
  • 00:38:07
    want to make sure that we are able to
  • 00:38:09
    stream data directly from
  • 00:38:11
    the random user API directly into Kafka
  • 00:38:15
    as much as possible so instead of just
  • 00:38:19
    one producing that we're doing we want
  • 00:38:21
    to be producing all the data that are
  • 00:38:24
    going to be sent from random user into
  • 00:38:27
    Kafka directly so we just need to modify
  • 00:38:29
    this particular
  • 00:38:31
    this particular script this function
  • 00:38:34
    will be updated
  • 00:38:36
    so you have to get data from our data
  • 00:38:38
    we're sending data to Kafka at this
  • 00:38:40
    point
  • 00:38:42
    now so instead of just this part we need
  • 00:38:45
    to remove the Kafka producer maybe oh
  • 00:38:50
    in here
  • 00:38:58
    and then we just do oh well
  • 00:39:02
    here
  • 00:39:03
    so we have we've imported time
  • 00:39:06
    so what we want to do is we want to
  • 00:39:09
    finish that part which is if time time
  • 00:39:16
    we need to get the current Style
  • 00:39:21
    so we just uh this this every time
  • 00:39:27
    so what we're going to do is we want to
  • 00:39:29
    stream for like two minutes or even five
  • 00:39:33
    minutes or six one minute I think we're
  • 00:39:35
    going to be producing rapidly so what we
  • 00:39:37
    want to do is we want to get the current
  • 00:39:38
    time
  • 00:39:43
    we just break this Loop otherwise
  • 00:39:47
    if it is not
  • 00:39:50
    better than one minute what you want to
  • 00:39:52
    do is once you get the data
  • 00:39:55
    from here so we want to move this guy
  • 00:40:00
    from here to this place
  • 00:40:04
    so we want to get the data
  • 00:40:06
    format it and then
  • 00:40:09
    maybe we just
  • 00:40:11
    producer
  • 00:40:13
    we already have a script down here
  • 00:40:16
    so we just move this so
  • 00:40:20
    so what we are doing here is while the
  • 00:40:23
    time is between one minute so while we
  • 00:40:26
    are producing within uh
  • 00:40:28
    from 0 to 60 Seconds
  • 00:40:30
    what I wanted is we want to as many
  • 00:40:32
    times as possible send requests to
  • 00:40:35
    random user.me API get the data format
  • 00:40:38
    that data and send it to me
  • 00:40:40
    to the queue itself so we just have a
  • 00:40:44
    exception sorry accept uh exception
  • 00:40:50
    as e
  • 00:40:51
    and then we just log that we just import
  • 00:40:56
    login
  • 00:40:59
    so we just uh log that part just log in
  • 00:41:04
    if I can only spell login
  • 00:41:07
    and arrow chord
  • 00:41:10
    and the error
  • 00:41:12
    is
  • 00:41:21
    the error is e
  • 00:41:25
    okay
  • 00:41:27
    and what we want to do is we want to
  • 00:41:30
    continue so even if maybe there is a
  • 00:41:32
    network downtime for like a two or two
  • 00:41:34
    seconds or 30 seconds it doesn't matter
  • 00:41:37
    we just logged that particular arrow and
  • 00:41:40
    continue the loop and once it is one
  • 00:41:42
    minute we break
  • 00:41:43
    that's what we want to do so that's uh
  • 00:41:46
    that's that with this uh adjustment at
  • 00:41:49
    this point so what we want to do is we
  • 00:41:51
    want to go back to the UI
  • 00:41:53
    refresh and Trigger the dag from the UI
  • 00:41:58
    yeah it's done so what we can do is we
  • 00:42:01
    trigger this we just turn it on
  • 00:42:09
    yeah okay so it's not triggering
  • 00:42:11
    automatically because we didn't enable
  • 00:42:13
    catch up so I just turn it on I just
  • 00:42:16
    triggered it and there's just three I
  • 00:42:19
    use automation should start any moments
  • 00:42:22
    now do we have anything in the logs I
  • 00:42:25
    don't think so so but let's let's follow
  • 00:42:28
    the production on the
  • 00:42:31
    the Kafka you are I'm going to just
  • 00:42:33
    refresh this page
  • 00:42:37
    okay so we are listening for new events
  • 00:42:40
    on the on the UI
  • 00:42:43
    just waiting for this to get triggered
  • 00:42:46
    okay so it's running now
  • 00:42:52
    once you start seeing data drop good so
  • 00:42:55
    data is dropping on the UI and it's
  • 00:42:57
    going to keep dropping for at least one
  • 00:43:00
    minute
  • 00:43:01
    we can trigger the
  • 00:43:05
    the list from here
  • 00:43:07
    and you can see the the offset number
  • 00:43:11
    and the data that are coming in anyways
  • 00:43:13
    good so while this is going on so that
  • 00:43:16
    means uh at this point all our data is
  • 00:43:20
    fine so we want to write the
  • 00:43:24
    if you will check our UI so we've taken
  • 00:43:27
    care of this part
  • 00:43:28
    and we've taken care of the architecture
  • 00:43:30
    so we want to set up the spark
  • 00:43:32
    architecture and the Cassandra which are
  • 00:43:35
    the other parts that is left that are
  • 00:43:37
    left so we have the Masterwork
  • 00:43:39
    architecture uh for this uh for the
  • 00:43:43
    purpose of this session I will just use
  • 00:43:45
    one uh Walker and I'll show you how to
  • 00:43:48
    add the other workers if you want to add
  • 00:43:50
    them
  • 00:43:51
    and then we'll go from there
  • 00:43:54
    we just go back to our Docker compost
  • 00:43:57
    in here I'll just copy paste the
  • 00:44:00
    Masterwork architecture and I'll just
  • 00:44:02
    talk through them
  • 00:44:04
    so here is the master worker
  • 00:44:06
    architecture and
  • 00:44:08
    what I'll do is I'll just quickly talk
  • 00:44:10
    through them so this spark Master is
  • 00:44:13
    coming from the sparkly test so the
  • 00:44:15
    command to start this master is to go
  • 00:44:18
    into the bin spark class and deploy the
  • 00:44:20
    master so it's going to expose this on
  • 00:44:23
    the part of 1990 and there's going to
  • 00:44:25
    have another Port of 1777 so this is
  • 00:44:28
    where the master will be communicate
  • 00:44:30
    with the workers themselves so if you
  • 00:44:32
    have multiple workers they'll be
  • 00:44:34
    communicating on 70 77 70 77 with the
  • 00:44:38
    master while if you want to access the
  • 00:44:40
    you I will use a port 1990
  • 00:44:44
    of course the they have to be on the
  • 00:44:46
    same network so they can work correctly
  • 00:44:48
    for the worker so if you have multiple
  • 00:44:51
    worker we just replicate this okay we
  • 00:44:55
    replicate this as one and then we call
  • 00:44:57
    let me show you
  • 00:44:59
    so if I copy this
  • 00:45:02
    I'll just change the name
  • 00:45:04
    from
  • 00:45:07
    spark worker I'll just add maybe spark
  • 00:45:10
    Walker 2
  • 00:45:12
    somewhere something like this and they
  • 00:45:15
    have the same uh
  • 00:45:17
    configuration but the dependency that it
  • 00:45:21
    they have against the master is going to
  • 00:45:24
    be the same same spot class but instead
  • 00:45:26
    of the master uh it's going to be
  • 00:45:29
    instead of the master here all of them
  • 00:45:31
    are going to have workers so if you have
  • 00:45:33
    more than one worker you can just um
  • 00:45:36
    have a the same class for them but
  • 00:45:39
    communicating with the master on the
  • 00:45:41
    same port uh the the same container name
  • 00:45:44
    and the port so yeah that's how to add
  • 00:45:47
    multiple workers but for the sake of the
  • 00:45:50
    speed I'm going to just leave it at one
  • 00:45:53
    and also these are the environment
  • 00:45:56
    variable for the um
  • 00:45:58
    for the worker so we need to have two
  • 00:46:00
    calls and a minimum of one gig so if you
  • 00:46:03
    reduce this one gig to maybe something
  • 00:46:04
    like 500 you are going to have an error
  • 00:46:08
    usually you have a minimum of one gig
  • 00:46:10
    for the workout because they are doing
  • 00:46:12
    the most of the work and
  • 00:46:13
    yeah so I tested this with something
  • 00:46:16
    less than one gig and I was having an
  • 00:46:18
    error so it's I think it's best to have
  • 00:46:19
    a minimum of one gig so you can also
  • 00:46:21
    check the documentation for more
  • 00:46:23
    information on that yeah now for the
  • 00:46:25
    Cassandra DB we have a we're going to be
  • 00:46:28
    using the latest the Cassandra so the
  • 00:46:31
    container name and the hostname are the
  • 00:46:32
    same we're exposing that on 94.2 the
  • 00:46:36
    maximum if size is 512 and the rest so
  • 00:46:39
    the username and password is Cassandra
  • 00:46:40
    Cassandra
  • 00:46:42
    so this is going to create amount of
  • 00:46:44
    volume locally called Cassandra because
  • 00:46:47
    of the host name it's going to mount it
  • 00:46:49
    and this if you want to get any data
  • 00:46:51
    into Cassandra you can do that through
  • 00:46:53
    that point it's not necessary we don't
  • 00:46:55
    need any volume really
  • 00:46:58
    and that's it they have to be running on
  • 00:47:00
    the same network so if we do a Docker
  • 00:47:03
    compose up
  • 00:47:04
    now we should have
  • 00:47:06
    uh
  • 00:47:08
    composed of detach we should have
  • 00:47:12
    he says
  • 00:47:14
    must be a strings postgres Network
  • 00:47:17
    must be a string let's see services
  • 00:47:20
    postgres
  • 00:47:22
    Services yeah we don't need this
  • 00:47:26
    just added when I mistakenly press the
  • 00:47:28
    entire the other time
  • 00:47:33
    all right so Cassandra is running
  • 00:47:37
    and if you look at this we have a
  • 00:47:41
    if you do Docker Pierce
  • 00:47:45
    we have
  • 00:47:48
    we have the Zookeeper which schema
  • 00:47:50
    registry
  • 00:47:51
    Let's see we have the scheduler yeah we
  • 00:47:54
    have the spark Walker which is started
  • 00:47:57
    and we have Cassandra the spark master
  • 00:48:00
    yeah so the spark master and the worker
  • 00:48:03
    I hope now 20 data engineering
  • 00:48:07
    I'm going to create a new file
  • 00:48:14
    I'll call it
  • 00:48:17
    spark stream
  • 00:48:19
    Dot py
  • 00:48:22
    all right
  • 00:48:25
    but if spark stream.py this is where
  • 00:48:29
    we're going to be writing our
  • 00:48:31
    our code for for spark streaming now we
  • 00:48:36
    have all the other dependencies working
  • 00:48:38
    so what we will need to do is we want we
  • 00:48:40
    need to install the Cassandra
  • 00:48:42
    driver
  • 00:48:43
    so um Pi spark as well so we do a peep
  • 00:48:51
    click install
  • 00:48:53
    Cassandra driver
  • 00:48:54
    so we can communicate with Cassandra
  • 00:48:58
    and then we just need to do with our
  • 00:49:01
    installation we need to do is this spark
  • 00:49:03
    and Pi Spark
  • 00:49:05
    so we need to install past spark and Pi
  • 00:49:08
    spark uh dependencies
  • 00:49:24
    wow Pi spark and Spark are installing
  • 00:49:27
    I'm going to continue coding while we
  • 00:49:29
    wait for the installation to be done
  • 00:49:37
    all right so now that all of the
  • 00:49:39
    dependencies are done are done I think
  • 00:49:42
    we can continue SQL so we are importing
  • 00:49:45
    spark session
  • 00:49:48
    and we from this same package we are
  • 00:49:50
    going to be importing some functions
  • 00:49:53
    we're going to be importing
  • 00:49:55
    are from Json type color so unless we
  • 00:50:00
    start with those ones for now so what we
  • 00:50:02
    want to do is we want to create a key
  • 00:50:06
    space on Cassandra before we do anything
  • 00:50:10
    because the key space is like a schema
  • 00:50:12
    if for people that knows uh SQL postgres
  • 00:50:17
    and the rest just like your public you
  • 00:50:19
    need to have a key space and then in
  • 00:50:20
    each key space you have multiple tables
  • 00:50:23
    all right so we are doing a create key
  • 00:50:25
    space
  • 00:50:27
    is going to be done with the session
  • 00:50:29
    I'll just put a pause in here so we we
  • 00:50:32
    create a key space here but we need to
  • 00:50:36
    create a connection so the dev
  • 00:50:38
    create a table
  • 00:50:40
    with a session so we are going to be
  • 00:50:42
    creating a table here
  • 00:50:46
    and then we need to uh insert the data
  • 00:50:49
    that we we are fetching from Kafka
  • 00:50:51
    inside data
  • 00:50:54
    so we are going to be getting the
  • 00:50:55
    session of course and then
  • 00:50:57
    quarks which is a
  • 00:51:00
    then we do the insertion here
  • 00:51:04
    and then we establish we need to
  • 00:51:06
    establish connection with the uh Kafka
  • 00:51:09
    spark and Cassandra so we need to uh uh
  • 00:51:13
    so let's say create
  • 00:51:17
    spark connection here
  • 00:51:20
    and then we are going to be creating
  • 00:51:24
    spark connection
  • 00:51:28
    create uh Cassandra
  • 00:51:31
    connection
  • 00:51:33
    okay
  • 00:51:36
    and then we'll be creating
  • 00:51:39
    Cassandra
  • 00:51:41
    connection all right so let's start
  • 00:51:44
    implementing this and then we work our
  • 00:51:46
    way up so once we have a connection to
  • 00:51:48
    Cassandra
  • 00:51:49
    from there
  • 00:51:51
    we create a connection to spark and then
  • 00:51:54
    we create the key space the table and
  • 00:51:58
    the rest so
  • 00:51:59
    let's start with the main function so if
  • 00:52:01
    the name
  • 00:52:03
    equals to mean
  • 00:52:08
    all right
  • 00:52:11
    so we are going to have a spark
  • 00:52:13
    connection which is going to be uh
  • 00:52:16
    create spark connection we're going to
  • 00:52:18
    write the implementation shortly and
  • 00:52:21
    then in the create spark connection so
  • 00:52:23
    what we want to do is we want to create
  • 00:52:25
    an we want to establish connection with
  • 00:52:26
    the spark which is going to be a try
  • 00:52:30
    we're going to have a
  • 00:52:32
    let's say screen
  • 00:52:34
    it's going to be spark session
  • 00:52:37
    so we build we we connect the Builder
  • 00:52:40
    and then instead of just get get or
  • 00:52:43
    create
  • 00:52:44
    I think we're still going to need this a
  • 00:52:46
    little bit and then we're going to have
  • 00:52:48
    the application name
  • 00:52:51
    so I'll say spark data streaming
  • 00:52:56
    and
  • 00:53:00
    you can have two dots in here can we
  • 00:53:03
    uh we just have the config this is where
  • 00:53:07
    things gets very interesting so with
  • 00:53:10
    spark we need to specify the Java file
  • 00:53:12
    that will be used so I'm going to be
  • 00:53:15
    using some packages in here
  • 00:53:18
    so two jars will be used to one it will
  • 00:53:21
    be used to create connection with spark
  • 00:53:23
    the other one with Cassandra
  • 00:53:26
    and Kafka another so we have
  • 00:53:30
    com.data data stacks
  • 00:53:33
    dot Spark
  • 00:53:34
    and the spark Cassandra
  • 00:53:38
    connector however if you want to get
  • 00:53:43
    access to them I think we can come into
  • 00:53:45
    Maven Repository
  • 00:53:47
    and we can search for a spark
  • 00:53:50
    this is moving Repository
  • 00:53:55
    maybe repository we can have a Cassandra
  • 00:54:01
    connector okay
  • 00:54:04
    I think we need to have spark Cassandra
  • 00:54:07
    connector
  • 00:54:09
    so from Com data Stacks we have a
  • 00:54:12
    connector from here so if you don't have
  • 00:54:14
    it I think you might need to download so
  • 00:54:17
    the latest one is 3.41
  • 00:54:20
    which I think is what we'll be using
  • 00:54:23
    so it's just the latest release so what
  • 00:54:26
    we need to do is we need to reference
  • 00:54:28
    this correctly we have spark data
  • 00:54:31
    connector we need to get the scalar
  • 00:54:33
    version and then we we get the the
  • 00:54:36
    version that we'll be using
  • 00:54:38
    all right so going back to the code we
  • 00:54:42
    are going to connect with the uh spark
  • 00:54:46
    Cassandra connector
  • 00:54:48
    uh the 2.13
  • 00:54:52
    version 3.41
  • 00:54:55
    and the other part which is going to be
  • 00:54:58
    oh
  • 00:54:59
    the other one which is going to be the
  • 00:55:01
    SQL Kafka
  • 00:55:03
    SQL Kafka connector
  • 00:55:08
    secure Kaka
  • 00:55:10
    yeah for this one
  • 00:55:12
    we have a
  • 00:55:14
    this is for structure swim streaming
  • 00:55:16
    okay so we have a 3.41 2.132 so if you
  • 00:55:21
    look at this guy
  • 00:55:23
    we have access to it and you can
  • 00:55:24
    download the Java file here include it
  • 00:55:26
    in your price Parker uh Repository
  • 00:55:31
    in the list of just there and you should
  • 00:55:33
    have access to this so
  • 00:55:35
    um
  • 00:55:36
    this is the module which is a
  • 00:55:39
    spark SQL kafka010213
  • 00:55:43
    all right so going back to the code here
  • 00:55:46
    I'm going to have a bug dot Apache
  • 00:55:49
    dot spark and I'm going to have spark
  • 00:55:53
    SQL Kafka
  • 00:55:55
    zero one zero underscore 2.13
  • 00:56:00
    3.41 okay
  • 00:56:02
    2.13
  • 00:56:05
    0 1 0. okay 2.13341
  • 00:56:11
    yeah so this is the those are these are
  • 00:56:15
    the two packages will be uh two Java
  • 00:56:17
    files will be needed okay
  • 00:56:19
    so the next one
  • 00:56:22
    is uh we just need to configure the
  • 00:56:26
    the host name for Cassandra which is
  • 00:56:28
    going to be Spark
  • 00:56:30
    dot Cassandra
  • 00:56:32
    theconnection.host
  • 00:56:36
    and we are going to be putting that as
  • 00:56:38
    localhost
  • 00:56:41
    all right
  • 00:56:44
    and
  • 00:56:46
    and that's it this should establish a
  • 00:56:48
    connection for us because we are going
  • 00:56:50
    to be running this on Docker so this is
  • 00:56:52
    going to be a broker
  • 00:56:54
    but if you want to test this locally we
  • 00:56:57
    do a localhost for now
  • 00:57:01
    all right
  • 00:57:02
    so this should give us access to
  • 00:57:06
    to spark all right and then we just do
  • 00:57:09
    accept
  • 00:57:11
    exception as a e
  • 00:57:14
    we just log the arrow
  • 00:57:17
    we couldn't uh
  • 00:57:21
    because I just add
  • 00:57:25
    couldn't create
  • 00:57:27
    the spark connection
  • 00:57:30
    maybe session due to the exception
  • 00:57:36
    okay and just
  • 00:57:39
    pass the
  • 00:57:41
    the errors argument there so with this
  • 00:57:44
    we should be able to establish a
  • 00:57:46
    connection
  • 00:57:49
    all right and at this point we just do
  • 00:57:54
    uh s con
  • 00:57:57
    dot spark context that set log level to
  • 00:58:01
    be I think
  • 00:58:03
    I'll just do error
  • 00:58:06
    plug in the info Spark
  • 00:58:09
    connection created successfully
  • 00:58:14
    foreign
  • 00:58:23
    so basically this is going to create a
  • 00:58:26
    spark connection for us
  • 00:58:29
    so again to go to what we've just done
  • 00:58:32
    we are creating a spark session with
  • 00:58:36
    this name
  • 00:58:37
    all right and then we are using these
  • 00:58:40
    Java packages
  • 00:58:41
    in this uh connection which is going to
  • 00:58:44
    be the 2.13 for this black Cassandra
  • 00:58:47
    connector and the other part is going to
  • 00:58:49
    be spark SQL Kafka so this creates a
  • 00:58:53
    connection for us on the
  • 00:58:58
    yeah okay
  • 00:59:02
    because to none
  • 00:59:08
    so this creates a connection for us so
  • 00:59:11
    the next thing we want to do
  • 00:59:13
    is we want to establish connection to
  • 00:59:18
    to Cassandra so now
  • 00:59:21
    in here where we are establishing once
  • 00:59:23
    we've established a spark connection the
  • 00:59:26
    next thing for us is to establish a
  • 00:59:28
    connection to Cassandra so we do that
  • 00:59:32
    with cluster so we just do cluster
  • 00:59:37
    I think we can have a cluster in here we
  • 00:59:40
    just which has been imported
  • 00:59:42
    uh we connect to localhost Cluster
  • 00:59:47
    all right
  • 00:59:49
    and this is going to be the uh
  • 00:59:52
    yeah connecting to the cluster
  • 00:59:57
    to Cassandra cluster all right
  • 01:00:02
    we're going to have a session
  • 01:00:05
    being created from the cluster we have
  • 01:00:07
    cluster.connect so this is going to
  • 01:00:09
    generate a session for us
  • 01:00:11
    and with this session we can return
  • 01:00:14
    session
  • 01:00:16
    uh I think we should do a try catch here
  • 01:00:22
    accept
  • 01:00:23
    exception as e
  • 01:00:26
    and log in the arrow
  • 01:00:31
    could not create
  • 01:00:35
    Cassandra
  • 01:00:36
    connection
  • 01:00:39
    due to
  • 01:00:44
    e
  • 01:00:45
    yeah
  • 01:00:46
    I'm going to
  • 01:00:48
    do the session here
  • 01:00:51
    question on
  • 01:00:53
    and yeah so once we do that
  • 01:00:57
    if our session if sparkcon
  • 01:01:00
    is not known then we continue okay
  • 01:01:05
    then
  • 01:01:06
    if that is known
  • 01:01:08
    I think if it is not known then we
  • 01:01:10
    establish a connection to uh Cassandra
  • 01:01:13
    connection
  • 01:01:14
    at this point so we just do uh this is
  • 01:01:17
    going to be uh
  • 01:01:19
    uh session
  • 01:01:23
    okay
  • 01:01:24
    I'm going to move this up here
  • 01:01:32
    I just don't know
  • 01:01:34
    okay
  • 01:01:40
    all right uh
  • 01:01:43
    this is shadowing the name outside uh
  • 01:01:47
    just call this uh Cassandra session
  • 01:01:53
    okay so if we have a connection we
  • 01:01:55
    return it when if not we return this so
  • 01:01:59
    so if
  • 01:02:01
    session is not
  • 01:02:04
    it's not I think we just break the
  • 01:02:06
    connection
  • 01:02:13
    we just return and there's no need to
  • 01:02:15
    continue at this point
  • 01:02:18
    I think we just do nothing uh
  • 01:02:23
    oh instead of that we just do if it is
  • 01:02:26
    not known so we only enter that if this
  • 01:02:29
    is not known so what we want to do is we
  • 01:02:31
    want to create a key space okay with the
  • 01:02:33
    session that we have
  • 01:02:35
    and then we want to create a table with
  • 01:02:38
    our session
  • 01:02:41
    for the key space we just come in here
  • 01:02:43
    we do a session dot execute
  • 01:02:55
    so even if we run this multiple times
  • 01:02:58
    because we added this command if it if
  • 01:03:01
    it does not exist it only runs once
  • 01:03:04
    so we do the same for table
  • 01:03:24
    now to insert the data into
  • 01:03:27
    uh into the queue into Cassandra
  • 01:03:31
    we are going to extract the
  • 01:03:35
    data from the quarks
  • 01:03:57
    all right so this gives us access to uh
  • 01:04:01
    the data from there and then we can try
  • 01:04:04
    to insert it now
  • 01:04:06
    which is going to be session dot execute
  • 01:04:30
    then
  • 01:04:32
    the last bit is uh
  • 01:04:35
    the only thing we need to do is we need
  • 01:04:37
    to connect to Kafka
  • 01:04:39
    so we can extract those information from
  • 01:04:42
    Kafka so the last Parts I think is going
  • 01:04:45
    to be uh
  • 01:04:47
    depth
  • 01:04:51
    I think we need to connect to
  • 01:04:56
    to Kafka
  • 01:05:00
    so we we are using the spark connection
  • 01:05:03
    so we are going to have a spark DF
  • 01:05:27
    so the spark basically what we're doing
  • 01:05:30
    here is we are connecting to this pack
  • 01:05:33
    uh by using spark connection to read
  • 01:05:36
    data from Kafka on this uh on this
  • 01:05:40
    server
  • 01:05:41
    with uh the users created topic and
  • 01:05:44
    we're starting from the beginning load
  • 01:05:46
    it and we are returning that data frame
  • 01:05:48
    from there so I think that's what we're
  • 01:05:52
    doing at that point
  • 01:05:54
    uh let's see what's the
  • 01:05:58
    yeah of course I think it's just the
  • 01:06:01
    this person
  • 01:06:06
    yeah I think that's the last person
  • 01:06:08
    issues
  • 01:06:11
    um
  • 01:06:13
    yeah I think that's pretty much it so
  • 01:06:15
    when we create the Cassandra connection
  • 01:06:17
    yeah
  • 01:06:19
    uh yeah that's all
  • 01:06:21
    so
  • 01:06:23
    I think we can test this out
  • 01:06:29
    uh once we connected Kafka so we need to
  • 01:06:32
    get into that frame
  • 01:06:33
    from there
  • 01:06:35
    so when we connected to spark
  • 01:06:39
    here
  • 01:06:41
    we need to get the data frame
  • 01:06:44
    so I'm going to call this uh DF
  • 01:06:48
    and connect to Kafka which is spark
  • 01:06:50
    connection
  • 01:06:52
    so connect to
  • 01:06:56
    Kafka with spark connection
  • 01:07:00
    this is where we are creating
  • 01:07:03
    create spark connection
  • 01:07:07
    all right and then
  • 01:07:10
    this is uh pretty much descriptive
  • 01:07:12
    and we create a key space click the
  • 01:07:15
    table
  • 01:07:16
    so instead of inserting directly I think
  • 01:07:19
    we need to write do a stream so we're
  • 01:07:21
    not doing it just once we are doing it
  • 01:07:23
    uh in stream so we just have a streaming
  • 01:07:26
    query
  • 01:07:27
    at this point which is going to be
  • 01:07:30
    I I think we need to select the data
  • 01:07:36
    and do selection DF it's going to be a
  • 01:07:40
    I think
  • 01:07:42
    you need to have a function
  • 01:07:45
    because once we have this data frame we
  • 01:07:47
    need to structure it in a way to insert
  • 01:07:50
    into Cassandra okay and the way to do
  • 01:07:53
    that is to have a struct fields which is
  • 01:07:57
    going to be create a selection
  • 01:08:01
    data frame from Kafka
  • 01:08:06
    okay and then we have a spark data frame
  • 01:08:10
    so this is going to we need the schema
  • 01:08:13
    at this point it's going to have a extra
  • 01:08:17
    type
  • 01:08:20
    construct type
  • 01:08:46
    so what we're doing basically is to
  • 01:08:48
    create a schema so when we read data
  • 01:08:50
    from the spark data frame we are
  • 01:08:52
    converting whatever we read into this
  • 01:08:54
    particular schema and then we are
  • 01:08:56
    selecting data from it we are selecting
  • 01:08:58
    everything from that column so if you
  • 01:09:00
    have let's say hundred one thousand one
  • 01:09:03
    million records whatever
  • 01:09:05
    it's going to select all of them from
  • 01:09:07
    the Kafka queue right once it selects
  • 01:09:09
    them even if it is just one record
  • 01:09:12
    it selects them and format it in this
  • 01:09:14
    particular schema and select the data
  • 01:09:16
    from it to be inserted so we just need
  • 01:09:19
    to do a selection
  • 01:09:22
    data frame which is going to be uh
  • 01:09:25
    create selection data frame from Kafka
  • 01:09:28
    and then we're using the spark
  • 01:09:30
    data frame which is the DF that we're
  • 01:09:34
    getting from here
  • 01:09:40
    yeah we're using that
  • 01:09:42
    and uh
  • 01:09:44
    yeah so we just need to get our
  • 01:09:46
    selection data frame and do a right
  • 01:09:49
    stream from that
  • 01:09:52
    in the format of where we are writing to
  • 01:09:55
    bug dot Apache Dot spark.sql.cassandra
  • 01:10:02
    option to check we need to have a
  • 01:10:05
    checkpoint location this is not
  • 01:10:07
    necessary this is not important uh it's
  • 01:10:11
    not compulsory but it's just to have a
  • 01:10:14
    checkpoint in case of failure
  • 01:10:16
    and I'll just have a better option
  • 01:10:21
    it can have a key space
  • 01:10:26
    option key space is going to be
  • 01:10:29
    spark strings
  • 01:10:35
    because you just put everything in it
  • 01:10:38
    yeah so we don't have to have
  • 01:10:41
    just have option
  • 01:10:43
    the table is gonna be
  • 01:10:46
    created users
  • 01:10:48
    and the last thing we need to do is
  • 01:10:50
    start this streaming yeah
  • 01:10:54
    and that's all we need to do
  • 01:10:58
    um so
  • 01:11:02
    we can just do streaming but uh weights
  • 01:11:05
    termination
  • 01:11:07
    there are some examples here
  • 01:11:10
    uh this one by two spark and you can see
  • 01:11:14
    everything that I wrote in there is
  • 01:11:16
    somewhere here so if you have multiple
  • 01:11:18
    Brokers you do something like that yeah
  • 01:11:20
    so everything is like that so you can
  • 01:11:22
    just uh maybe come in here do a copy
  • 01:11:25
    paste and all that but yeah that's this
  • 01:11:27
    is where I got most of the code from and
  • 01:11:30
    I'm just uh replicating that the on the
  • 01:11:33
    code
  • 01:11:34
    yeah so let's let's pin back our uh
  • 01:11:38
    Docker containers
  • 01:11:41
    and then detach them
  • 01:11:43
    [Music]
  • 01:11:46
    I think we we have an arrow somewhere
  • 01:11:49
    here
  • 01:11:51
    hello
  • 01:11:52
    yeah
  • 01:11:53
    it's a coma
  • 01:12:00
    in your arrows
  • 01:12:02
    can I see one of that red
  • 01:12:06
    guys just uh
  • 01:12:08
    this is fine this is a public uh
  • 01:12:11
    the key space name so it's fine I
  • 01:12:13
    couldn't recognize it so it's okay
  • 01:12:17
    yeah I think while we wait for that to
  • 01:12:20
    come up and that's all we need to do
  • 01:12:30
    right now that the installation is done
  • 01:12:32
    our web server is back up and running
  • 01:12:35
    and uh I should lies back it's also up
  • 01:12:39
    and running let's see Chevrolet is also
  • 01:12:42
    listening
  • 01:12:43
    uh also we you need to see why our
  • 01:12:46
    control center has gone down
  • 01:12:49
    I think we just need to start it up
  • 01:12:50
    there's really no need
  • 01:12:52
    uh we don't need the control center at
  • 01:12:54
    this point because we know that our data
  • 01:12:56
    will always be produced and we don't
  • 01:12:58
    need to visualize them anymore
  • 01:13:00
    even we don't need the schema registry
  • 01:13:03
    anymore because that's connected to the
  • 01:13:05
    control center so we only need the
  • 01:13:09
    the the Kafka which is uh our broker to
  • 01:13:13
    go
  • 01:13:13
    and uh that we're able to connect to it
  • 01:13:16
    so
  • 01:13:18
    let's see if our web server
  • 01:13:20
    is up and running
  • 01:13:24
    I guess it is we just do admin admin to
  • 01:13:27
    sign in
  • 01:13:31
    and uh we just go in here
  • 01:13:36
    to see the
  • 01:13:37
    dog
  • 01:13:39
    and we have it stream data from API good
  • 01:13:42
    so what we need to do
  • 01:13:45
    is we
  • 01:13:47
    we need to
  • 01:13:50
    check our Cassandra Eva Cassandra is
  • 01:13:53
    open running but we need to do a run of
  • 01:13:57
    this spark stream to see if we able to
  • 01:13:59
    even connect to
  • 01:14:01
    to spark stream at all
  • 01:14:06
    so it's going to run at the entry points
  • 01:14:09
    yeah
  • 01:14:14
    and try to establish a connection
  • 01:14:23
    okay so I think with this dependencies
  • 01:14:26
    because I don't have them before
  • 01:14:29
    these two guys uh I think it should
  • 01:14:33
    download them
  • 01:14:35
    so he says the artifact does not exist
  • 01:14:38
    on risk of dependencies you couldn't see
  • 01:14:40
    Cassandra connector 213
  • 01:14:44
    14 let's see Cassandra connector so
  • 01:14:48
    let's go into moving
  • 01:14:50
    Maven repository spark Cassandra
  • 01:14:52
    connector
  • 01:14:54
    two one three three four one oh it's
  • 01:14:58
    3.4.1
  • 01:15:00
    yeah
  • 01:15:02
    because that's it uh we just need to
  • 01:15:04
    rerun this
  • 01:15:14
    and I think
  • 01:15:17
    yeah with this it's a
  • 01:15:20
    let's see if it connects to it
  • 01:15:25
    this is the server error mismatched in
  • 01:15:29
    Beauty and the file
  • 01:15:30
    so we need to establishing a connection
  • 01:15:34
    uh trying to execute
  • 01:15:38
    yeah I didn't close these uh
  • 01:15:41
    I didn't close that uh
  • 01:15:45
    this uh
  • 01:15:46
    this bracket
  • 01:15:49
    I think did I do the same for the key
  • 01:15:52
    space
  • 01:15:53
    yeah the key space is fine it's only the
  • 01:15:55
    table so I need to just rerun that
  • 01:16:10
    all right so we're able to create a key
  • 01:16:12
    space
  • 01:16:14
    let's see we're able to create a key
  • 01:16:15
    space I'm able to create a key uh a
  • 01:16:18
    table
  • 01:16:20
    so the only thing that is left is that
  • 01:16:22
    uh
  • 01:16:23
    is uh it says uh an error call while
  • 01:16:28
    publishing this to do to data
  • 01:16:31
    stacks.login so
  • 01:16:33
    that there are two ways to this so we
  • 01:16:36
    need to we need to it was starting the
  • 01:16:38
    the stream so we need to go into uh
  • 01:16:43
    because we are using this uh this uh jav
  • 01:16:47
    files
  • 01:16:49
    in here we need to add a dependency if
  • 01:16:52
    you look at our VMV
  • 01:16:54
    you know I've been I think you know I
  • 01:16:57
    think in the lips
  • 01:16:59
    in the leaves and you check a buy spark
  • 01:17:02
    we need to download those uh
  • 01:17:06
    price you need to download those
  • 01:17:08
    Java files in there if you look at the
  • 01:17:11
    jars in here
  • 01:17:13
    we need to find uh
  • 01:17:15
    spark SQL
  • 01:17:18
    and we we can't find them here
  • 01:17:21
    okay so we need to add them to this I
  • 01:17:25
    think I already did the download so I'm
  • 01:17:27
    going to copy paste them
  • 01:17:30
    and while I do that we can do uh we can
  • 01:17:34
    go into the Cassandra and uh
  • 01:17:37
    check so we we go into the Cassandra and
  • 01:17:40
    then we have the
  • 01:17:42
    interactive terminal with the use this
  • 01:17:45
    is determinant uh this is the container
  • 01:17:49
    name
  • 01:17:49
    now we're going into SQL sh with the
  • 01:17:51
    username of Cassandra password of
  • 01:17:53
    Cassandra the localhost is uh is the
  • 01:17:57
    iPad is the IP and then this is the port
  • 01:17:59
    so if you exit if you go into that we
  • 01:18:03
    have access to the SQL sh and we can do
  • 01:18:06
    a describe
  • 01:18:11
    sparkstreams.created users
  • 01:18:14
    and we have
  • 01:18:16
    decoded users outside the uuid and the
  • 01:18:19
    rest and these are the details of the
  • 01:18:22
    data that we have so if we do a select
  • 01:18:25
    stuff from spark streams created users
  • 01:18:28
    we have an ID address and the rest of
  • 01:18:31
    the data which is good so we have access
  • 01:18:33
    to to Cassandra data directly so all we
  • 01:18:38
    need to do now is by the time we are
  • 01:18:40
    streaming data from uh Kafka we need to
  • 01:18:45
    push this data to Cassandra so the only
  • 01:18:47
    dependencies that we we need to add
  • 01:18:50
    are those Java files which you need to
  • 01:18:52
    download for now I will just exit and do
  • 01:18:55
    a Docker compost down to remove all
  • 01:18:57
    these existing images
  • 01:18:59
    then we continue from there already copy
  • 01:19:01
    the Java files so what I'm going to do
  • 01:19:03
    I'm going to copy paste steps into the
  • 01:19:06
    Jazz directory so we can have a
  • 01:19:08
    dependencies on them
  • 01:19:10
    it's asking me to click OK yeah I just
  • 01:19:13
    copy them so the two Java files have
  • 01:19:16
    been copied and you can see them there
  • 01:19:18
    one is the sparkassandra connector and
  • 01:19:21
    then the other one is pack SQL Kafka
  • 01:19:24
    connector
  • 01:19:36
    what we need to do now is start up the
  • 01:19:38
    docker compose
  • 01:19:40
    now we do Docker compose of detached
  • 01:19:43
    we can start up the data card compose
  • 01:19:46
    the containers are starting up and you
  • 01:19:49
    can see that uh our web server is having
  • 01:19:51
    an error what's the arrow I suppose that
  • 01:19:54
    might be the entry point but let's see
  • 01:19:57
    uh web server if you click on that to
  • 01:20:01
    scroll to the bottom and yeah I
  • 01:20:04
    suspected uh command no found M so if we
  • 01:20:07
    go back to entry point uh we have a an M
  • 01:20:10
    when I'm trying to install upgrade the
  • 01:20:12
    PIP now just delete that and try again I
  • 01:20:16
    guess
  • 01:20:17
    it's going to start the web server
  • 01:20:21
    and then we can check again I just clear
  • 01:20:23
    this
  • 01:20:24
    yeah okay I think it's starting up
  • 01:20:27
    so if you go outside of here
  • 01:20:30
    we need to stop the control center and
  • 01:20:35
    uh the schema registry
  • 01:20:38
    so once we stop the schema registry the
  • 01:20:41
    control center is not going to be coming
  • 01:20:42
    up because of the dependence
  • 01:20:44
    anyways so the next thing we need to do
  • 01:20:46
    is we need to go to the UI and check our
  • 01:20:50
    Spark
  • 01:20:51
    the status of our spark session so going
  • 01:20:55
    back to
  • 01:20:56
    the UI you go to localhost 1990 just
  • 01:21:00
    refresh this
  • 01:21:02
    yeah and you can see our spark Master
  • 01:21:05
    the IP address which is the localhost
  • 01:21:07
    really is just the IP address of the
  • 01:21:09
    docker content itself that is running it
  • 01:21:11
    so we have the worker ID you have
  • 01:21:13
    there's no running application no
  • 01:21:15
    completed application and you can see
  • 01:21:18
    that so we go back and then we submit a
  • 01:21:20
    job our first job to that but before we
  • 01:21:22
    do that I think we need to comment out
  • 01:21:24
    the streaming part of the the spark
  • 01:21:27
    system
  • 01:21:28
    where we're streaming into Cassandra for
  • 01:21:30
    now I'll just commit these two out
  • 01:21:35
    yeah
  • 01:21:36
    then we can try we want to see whether
  • 01:21:38
    the key space on the table will be
  • 01:21:40
    automatically created
  • 01:21:54
    if we do a Refresh on the UI yeah okay
  • 01:21:57
    so we can see the ID
  • 01:21:59
    the ID is uh and the name is practical
  • 01:22:03
    streaming it's been running for 14
  • 01:22:04
    seconds
  • 01:22:05
    so if you click on the the application
  • 01:22:08
    you can see the worker that is running
  • 01:22:09
    it and the worker ID the number of calls
  • 01:22:13
    the memory and the rest
  • 01:22:15
    so going back to the time now we can see
  • 01:22:17
    the key space successfully created and
  • 01:22:19
    table created successfully but is the
  • 01:22:23
    connection yeah the connection it says
  • 01:22:26
    it's saying an error what's going on
  • 01:22:29
    let's look into the docker container
  • 01:22:32
    uh so I think we just need to do Docker
  • 01:22:36
    exec it to see what is going on
  • 01:22:39
    and I think the script is fine I'll just
  • 01:22:42
    do Docker exec and check it
  • 01:22:47
    and I'll do it select star from there
  • 01:22:53
    is Spark
  • 01:22:55
    yeah I think it's fine we just exceed
  • 01:22:57
    that uh we'll resubmit the job and check
  • 01:23:01
    again if it is fine or not
  • 01:23:09
    okay
  • 01:23:10
    I think it's coming back up
  • 01:23:18
    yeah let's check can we see any error it
  • 01:23:20
    says successfully started the service
  • 01:23:23
    connecting to master
  • 01:23:26
    yeah I think I think you successfully
  • 01:23:28
    connected and if you look at the data
  • 01:23:31
    frame that we created which is the
  • 01:23:32
    structure into spark which is fine and
  • 01:23:35
    Cassandra is fine the data frame is okay
  • 01:23:38
    and uh yeah I think we are good
  • 01:23:43
    really
  • 01:23:44
    right
  • 01:23:46
    so the next thing we need to do is we
  • 01:23:50
    need to
  • 01:23:52
    submit a job and enable the streaming to
  • 01:23:55
    Cassandra
  • 01:23:57
    so if we refresh this check the UI and
  • 01:24:00
    refresh
  • 01:24:02
    I just delete this and go to the 1990
  • 01:24:06
    refresh this part
  • 01:24:08
    I think we need to log in because we
  • 01:24:10
    already spinning down from right okay we
  • 01:24:13
    don't
  • 01:24:14
    I'll click on that okay let's start it
  • 01:24:21
    trigger dog
  • 01:24:24
    so it's coming out
  • 01:24:27
    now that the key space and the table has
  • 01:24:30
    been created we can comment this part
  • 01:24:33
    and resubmit the job so I added a login
  • 01:24:37
    info so we can see when the streaming is
  • 01:24:39
    being started and you can see the key
  • 01:24:41
    space and the table has been created
  • 01:24:43
    added this part
  • 01:24:45
    and then so what we're going to be doing
  • 01:24:47
    is we're going to be submitting this job
  • 01:24:50
    back to a budget Spark
  • 01:24:55
    then
  • 01:24:57
    simultaneously we're going to trigger
  • 01:24:59
    the dark so we can start seeing the data
  • 01:25:02
    that has been produced the cap to Kafka
  • 01:25:04
    queue and this is the last one that was
  • 01:25:07
    running so I'm going to retrigger this
  • 01:25:09
    so we have a second run and once that is
  • 01:25:12
    done we we check we do a double check on
  • 01:25:16
    the on Cassandra to see if we are
  • 01:25:18
    getting streams of data into Cassandra
  • 01:25:20
    well right now
  • 01:25:22
    right now we need to get the data into
  • 01:25:25
    Kafka first
  • 01:25:27
    by triggering this dag which is
  • 01:25:29
    currently running
  • 01:25:30
    then we go from there into into
  • 01:25:34
    Cassandra
  • 01:25:38
    I'm going to open a new terminal
  • 01:25:42
    well we doing
  • 01:25:45
    an execution into the terminal of
  • 01:25:48
    Cassandra so I'm going to do a Docker
  • 01:25:50
    exec Dash it into Cassandra and I'm
  • 01:25:54
    going to be running the SQL statement
  • 01:25:58
    that we're going to be using so this
  • 01:26:02
    the producing is done
  • 01:26:04
    when we trigger The Jug so the next
  • 01:26:06
    thing we need to do is check uh I'll
  • 01:26:08
    just do select star from there select
  • 01:26:12
    stuff from
  • 01:26:14
    that would be either spark streams
  • 01:26:17
    dot uh
  • 01:26:19
    created users
  • 01:26:22
    yeah and you can see we have initially
  • 01:26:25
    59 rules
  • 01:26:27
    if we go in there and do a select star
  • 01:26:30
    from
  • 01:26:31
    I'm going to just select the first name
  • 01:26:33
    and the last name so we can I think it's
  • 01:26:36
    a little bit more so I'll just do select
  • 01:26:39
    first name
  • 01:26:40
    and last name from
  • 01:26:43
    from spark stream to created users
  • 01:26:48
    and you can see the username and
  • 01:26:49
    password uh the first name and the last
  • 01:26:52
    name of the users that have been
  • 01:26:53
    streamed into Cassandra so basically
  • 01:26:57
    that's how to do it and if we trigger
  • 01:27:01
    more we see more records and uh yeah
  • 01:27:05
    that kind of caps you though
  • 01:27:07
    basically I think we've satisfied all
  • 01:27:09
    the items on our architecture we have
  • 01:27:11
    the API and from API to Cassandra we
  • 01:27:15
    started with the Apache airflow which is
  • 01:27:17
    coming pulling the data from the API and
  • 01:27:21
    once we trigger that process it streams
  • 01:27:23
    data into Kafka then to Apache spark
  • 01:27:26
    then into Cassandra and there you have
  • 01:27:29
    it folks and end-to-end data engineering
  • 01:27:32
    pipeline from data ingestion to
  • 01:27:35
    processing and storage all containerized
  • 01:27:38
    with Docker find this video useful don't
  • 01:27:41
    forget to give it a thumbs up and
  • 01:27:43
    subscribe for more content like this
  • 01:27:44
    until next time happy engineering
Tags
  • Data Engineering
  • Apache Airflow
  • Kafka
  • Cassandra
  • Docker
  • PostgreSQL
  • Zookeeper
  • Spark
  • Random User API
  • Containerization