00:00:00
hi all
00:00:01
so
00:00:03
so as part of this session uh
00:00:06
we are going to uh
00:00:08
learn
00:00:09
so how we can run
00:00:13
google cloud data flow batch processing
00:00:17
pipeline so
00:00:18
so as part of this session i would like
00:00:20
to concentrate much on the demo part
00:00:24
of it so the theoretical aspects right
00:00:26
so you can learn from the official
00:00:28
documentation
00:00:31
from the google cloud
00:00:32
platform all right
00:00:34
let's move on
00:00:36
so as part of this demo
00:00:38
so there are some
00:00:41
prerequisites and assumptions made
00:00:44
so
00:00:46
whoever is interested to learn right so
00:00:48
so what is the assumption is like
00:00:50
so the user already has the basic
00:00:53
understanding of the
00:00:55
below gcp services and also the basic
00:00:58
understanding of the python program
00:01:00
right you will also see
00:01:02
each and every uh gcp services which are
00:01:05
being used as part of this this demo on
00:01:08
high level
00:01:09
right
00:01:10
the first service is data flow so
00:01:13
data flow belongs to the
00:01:15
gcp big data processing stack
00:01:19
so
00:01:20
so it is truth mainly used for
00:01:23
processing
00:01:24
bulk volume of data or huge data
00:01:27
so it's used for unified batch and
00:01:29
streaming data processing
00:01:31
right and also apache beam sdk
00:01:35
this is a programming model
00:01:37
used to create pipeline which can be run
00:01:39
on the data flow
00:01:42
so it's backed by
00:01:44
uh and supported by
00:01:46
different programming languages java
00:01:49
python and go
00:01:50
so by the way this is not a gcp service
00:01:53
so it's an open source
00:01:55
uh framework
00:01:57
available
00:01:58
all right so the next service is
00:02:00
bigquery right bigquery is a enterprise
00:02:03
data warehouse
00:02:05
dcp platform
00:02:07
uh you can interact
00:02:10
with the bigquery using standard sql
00:02:12
uh you can find query
00:02:16
uh petabyte skill data using bigquery
00:02:19
right it's a data warehouse so you can
00:02:22
build
00:02:23
uh any kind of data warehouse
00:02:26
in bigquery all right
00:02:27
gcs
00:02:29
google cloud storage
00:02:30
so you can store
00:02:33
any type of data like
00:02:37
structured and unsecured data
00:02:40
in google cloud storage so it comes
00:02:42
under storage service
00:02:44
so then we are going to use cloud shell
00:02:47
so
00:02:48
so cloud shell is in an
00:02:50
interactive shell
00:02:52
available in the google platform so you
00:02:54
can
00:02:55
interact
00:02:56
with any other google cloud service
00:02:58
using this
00:02:59
cloud shell
00:03:00
uh
00:03:01
using cli commands
00:03:03
right it also comes with some pre-built
00:03:06
pre-installed softwares you can run your
00:03:09
you can
00:03:11
run your codes and you can
00:03:14
basically build your code
00:03:16
either in python or java right
00:03:19
the next service is pops up
00:03:21
pops up is a global messaging service
00:03:24
um
00:03:25
so it basically works on publisher and
00:03:27
subscriber model
00:03:30
and also allow services to come in kita
00:03:32
synchronously
00:03:34
all right
00:03:35
so then we are going to use cloud
00:03:36
scheduler it's a fully managed crontab
00:03:40
service
00:03:41
so you can schedule any job using our
00:03:43
standard ground top
00:03:46
right so
00:03:48
so these are the different services i'm
00:03:49
going to use as part of this demo so
00:03:53
we will see
00:03:55
[Music]
00:03:56
batch processing
00:03:58
using data flow
00:04:01
all right so
00:04:02
this is a use case so
00:04:05
right what we are trying to do as part
00:04:07
of this demo first of all uh
00:04:09
there is some data in a csv file which
00:04:11
is placed
00:04:13
an agcs bucket
00:04:15
you can observe the reference
00:04:17
architecture right
00:04:19
on the top of the slide right there's a
00:04:21
csv file placed in the gsv gcs bucket
00:04:25
we are going to read the data
00:04:27
and we are we are going to perform some
00:04:29
series of transformations
00:04:31
so i will explain those in detail in
00:04:33
coming slides
00:04:35
and then
00:04:36
we will write that final data for
00:04:38
metadata into bigquery
00:04:40
all right
00:04:41
so
00:04:42
before going to the demo um i would like
00:04:44
to explain uh
00:04:47
uh the commands
00:04:49
used to run this pipeline and also the
00:04:52
code
00:04:52
first of all uh
00:04:54
i will explain
00:04:56
uh
00:04:58
the transformation what we are going to
00:05:00
perform as part of this pipeline with
00:05:02
some
00:05:02
sample data or example data then i'll
00:05:05
come back to this slide and then i will
00:05:07
explain this
00:05:09
this particular command how we are going
00:05:11
to run this
00:05:14
so let me go to
00:05:18
so this is the sample letter
00:05:20
right it looks like this
00:05:23
it is having uh
00:05:24
[Music]
00:05:25
almost
00:05:27
seven to eight columns
00:05:29
we are not going to use all these
00:05:31
columns as part of
00:05:33
our pipeline
00:05:35
we are mainly focusing on three columns
00:05:37
that is symbol
00:05:38
uh on this flag by yourself
00:05:41
and the quantity traded all right so
00:05:44
so this is the first transformation we
00:05:46
are going to perform on top of this data
00:05:49
so we we are reading
00:05:51
symbol and flag and the quantity
00:05:53
separately this is the first
00:05:54
transformation
00:05:55
in the
00:05:56
second transformation right so
00:05:59
based on this flag if it is a by
00:06:02
we are going to append nothing before
00:06:04
this quantity if it is a cell we are
00:06:06
going to append minus
00:06:09
before this quantity
00:06:10
right this is the second transformation
00:06:13
then
00:06:15
in the third transformation right we are
00:06:16
going to group
00:06:19
these values based on the symbol
00:06:21
right that's what we're doing so the
00:06:23
first symbol has the three values
00:06:25
and we're grouping the second symbol has
00:06:28
the two values when grouping
00:06:30
right the finally
00:06:32
we are summing up these values based on
00:06:34
the group
00:06:35
right this is the final
00:06:37
uh
00:06:38
data which we are going to
00:06:40
insert into a bigquery table
00:06:42
okay
00:06:43
so
00:06:43
i hope you understood what we are going
00:06:46
to
00:06:47
perform
00:06:48
on top of this data all right then i'll
00:06:51
try to explain the code
00:06:53
and also the pipeline command
00:06:56
let's move on to the slide again
00:07:00
okay
00:07:00
so this is the command
00:07:03
uh the batch pipeline command so i will
00:07:05
try to explain each and every
00:07:07
uh
00:07:08
part of this command right so
00:07:10
first thing is
00:07:12
that
00:07:13
bulk deal underscore hgr is a
00:07:16
python program or pipeline i would say
00:07:18
where we have our pipeline code
00:07:21
available
00:07:22
so it takes certain number of input
00:07:24
arguments the input and output project
00:07:27
region staging
00:07:28
temp location and the runner right input
00:07:31
is from where we are trying to read our
00:07:33
input data right we are trying to read
00:07:35
our input data from a gcs packet
00:07:38
which is in csv format and the output
00:07:42
arguments also mandatory but as part of
00:07:45
my pipeline right i just defaulted it to
00:07:47
this path
00:07:48
so
00:07:50
but as part of this pipeline i'm trying
00:07:52
to write our final data into bigquery so
00:07:54
there is no much you don't have to
00:07:56
concentrate much on this particular
00:07:58
output path right so it's defaulted to
00:08:00
that path
00:08:02
and then the project
00:08:03
project name you have to
00:08:05
pass and also region so this is a very
00:08:08
important parameter right so if you
00:08:11
don't pass any of these input arguments
00:08:13
it will throw in an error or exception
00:08:16
so region right so
00:08:17
region is a very important input
00:08:19
argument basically if you don't uh
00:08:22
mention this input argument right so
00:08:25
google google cloud doesn't know
00:08:28
in which region it has to spin up your
00:08:30
data from workers
00:08:32
right so that that's where you have to
00:08:36
definitely pass this input argument so
00:08:38
i'm trying to i'm asking google cloud to
00:08:41
spin up
00:08:42
my data flow workers in asia south
00:08:44
region south to region right so
00:08:46
usually the practice is you have to
00:08:48
select your nearest region so
00:08:51
so so we have mumbai and delhi two
00:08:55
regions available
00:08:57
so i'm selecting um delhi okay you have
00:09:00
to specify staging location in the
00:09:02
energies bucket so on the temp location
00:09:04
right so these two are very important
00:09:07
because while running a pipeline right
00:09:09
so data flow will
00:09:11
create some temporary files and logs so
00:09:13
it will store those files into
00:09:15
these two
00:09:17
buckets
00:09:18
right and the runner so runner you have
00:09:20
to specify it's a data flow runner right
00:09:22
so if you have if you're testing this
00:09:24
pipeline locally there you will specify
00:09:27
it as a direct runner okay
00:09:29
let's move on to the code
00:09:36
all right so this is the this is our
00:09:38
pipeline code
00:09:40
uh basically the first part of the
00:09:42
pipeline is importing uh the python
00:09:44
module required for this uh
00:09:48
pipeline so
00:09:50
then uh
00:09:51
basically we are using some certain
00:09:53
number of uh
00:09:55
transformation available in the apache
00:09:56
beam the first one is purdue
00:09:59
so purdue is a parallel room actually
00:10:01
that
00:10:02
that full name is parallel to
00:10:04
so it works like this right
00:10:06
so whenever whenever
00:10:08
you are
00:10:09
you would like to process huge amount of
00:10:11
data right so
00:10:14
basically why we are going for data flow
00:10:16
data flow is there are there are three
00:10:19
important
00:10:20
aspects i would like to explain
00:10:22
first thing is it
00:10:24
it can process huge amount of data
00:10:27
in parallel and also distributed manner
00:10:30
all right so why because whenever you
00:10:33
submit your workload based on your
00:10:34
workload data flow can scale up
00:10:38
number of workers based on your workload
00:10:41
right it can scale from one to thousand
00:10:43
workers all right so you can even
00:10:46
control the number of workers through
00:10:48
your input
00:10:51
arguments you can specify number of
00:10:53
workers to be used as part of your
00:10:55
data processing or pipeline
00:10:57
right
00:10:58
so the part of basically what it does
00:11:00
right so whenever you
00:11:03
use this transformation that means it is
00:11:06
meant for
00:11:07
user defined transformation whenever you
00:11:10
would like to write your own
00:11:11
transformation to be parallel pro
00:11:14
to be processed parallelly then you have
00:11:16
to use purdue transformation
00:11:18
right
00:11:19
here i am using
00:11:21
purdue to read
00:11:24
three columns from my input data
00:11:27
and
00:11:29
then based on the flag
00:11:32
i am going to append plus or minus
00:11:34
symbol before the quantity that's what i
00:11:36
am doing
00:11:37
so
00:11:38
then
00:11:38
we have a run method this is an entry
00:11:41
entry method or entry function
00:11:44
for your uh
00:11:46
apache beam pipeline right here in the
00:11:48
first
00:11:50
place right we have to specify our input
00:11:52
arguments right so since we are reading
00:11:54
our input argument through command line
00:11:57
so we have to
00:11:58
default those two
00:12:00
default input and output to some gcs
00:12:02
bucket path all right that's what i'm
00:12:04
doing right so i'm using a python
00:12:08
specific module to do that that is a
00:12:10
arcboss module right
00:12:13
then i have some
00:12:14
[Music]
00:12:15
simple
00:12:16
function to form a
00:12:18
simple function written to format my
00:12:21
intermediate data
00:12:22
while perf while performing series of
00:12:25
transformation one is some groups so
00:12:27
what it does is basically read your
00:12:30
symbol
00:12:31
and group values then it will sum
00:12:34
those values based on the symbol
00:12:37
uh i do have a parse method so this is a
00:12:40
simple method
00:12:42
uh which is being used while
00:12:44
uh
00:12:45
converting final data into bigquery
00:12:47
readable format right so bigquery
00:12:50
can't read
00:12:51
[Music]
00:12:53
any kind of data right you have to
00:12:56
you have to
00:12:57
prepare the data uh
00:13:00
readable
00:13:01
by bigquery so bigquery can read any
00:13:04
json or dictionary format data right so
00:13:06
that's what uh
00:13:08
we are trying to use this method which
00:13:09
which will
00:13:11
pass that particular string formatted
00:13:13
data into dictionary or json format
00:13:16
right here i have my pipeline you can
00:13:18
see so
00:13:20
in the pipeline you can see series of
00:13:22
transformation applied
00:13:24
one after the other right
00:13:25
so in the purdue purdue i've already
00:13:28
explained
00:13:29
and then
00:13:30
i'm converting once i read the data then
00:13:33
i'm converting the 20 tuple
00:13:35
and then i'm doing group by
00:13:37
and then summing
00:13:39
and then again converting to string and
00:13:41
finally i'm converting the data into
00:13:43
json our dictionary format so in the
00:13:45
final step
00:13:47
i'm writing the data into a
00:13:49
bigquery table here i have mentioned
00:13:52
certain number of input arguments
00:13:55
right so this is the table name bigquery
00:13:57
table name
00:13:58
this is the this is the dataset name on
00:14:02
the project and the schema of the table
00:14:04
here there are two things
00:14:06
if my table is already not exist in the
00:14:08
bigquery i'm asking it to create
00:14:10
the table already exist then whenever
00:14:13
i'm trying to
00:14:14
insert a data before inserting truncate
00:14:17
the previous data and then insert
00:14:19
so this is the pipeline now let us go
00:14:22
and
00:14:24
run our pipeline
00:14:26
right
00:14:27
so my i'm running my pipeline
00:14:36
right so this is a cloud shell
00:14:38
environment i already explained so here
00:14:42
i am
00:14:43
submitting my pipeline
00:14:47
commands to run my pipeline
00:14:49
let me submit that
00:14:53
let me authorize
00:14:57
yeah now my pipeline started running now
00:15:00
you can slowly observe one by one what
00:15:02
is happening
00:15:03
then we'll move on to the
00:15:05
data flow
00:15:07
console in gcp platform then we'll we'll
00:15:09
try to observe what is happening over
00:15:11
there
00:15:13
okay now it is started
00:15:17
okay
00:15:18
let's wait for some time
00:15:24
so you can see job state initially job
00:15:27
state will be pending
00:15:29
so slowly it will check for the workers
00:15:31
availability that region
00:15:35
so if worker is available then
00:15:37
it clearly says starting one workers in
00:15:40
asia south too
00:15:42
right let's move on to the
00:15:44
data flow window
00:15:51
right so let me refresh this page
00:15:58
see there is one job
00:16:00
in running state all right
00:16:02
so this is a data flow console right so
00:16:06
this is a job console
00:16:11
click on this job
00:16:14
here
00:16:15
this here you can see in
00:16:18
in-depth details
00:16:20
of apache beam pipeline which is running
00:16:22
a data flow right
00:16:25
this is a graphical representation of
00:16:27
your entire pipeline
00:16:29
so whatever the
00:16:31
transformations we mentioned over here
00:16:34
we label each and every transformation
00:16:36
right
00:16:37
so it can be represented graphically
00:16:39
in the data flow console
00:16:42
right
00:16:43
so you can see here right
00:16:45
now it started reading data from the gcs
00:16:48
bucket
00:16:48
so slowly it will take some time
00:16:51
and then then
00:16:53
once everything is done then
00:16:56
you can see your data will be written
00:16:58
into
00:16:59
a big query let's go to the bigquery
00:17:01
window
00:17:02
also so this is a
00:17:04
this is a big query
00:17:06
uh this is the data set created
00:17:08
so once that
00:17:11
pipeline is
00:17:13
uh
00:17:15
completes it's running right so then it
00:17:17
will create a table with
00:17:19
uh
00:17:21
with the transform data
00:17:23
right so we'll have to wait till that
00:17:26
complete right
00:17:27
so let's see i'm refreshing this page as
00:17:30
well
00:17:36
right now
00:17:39
right now i don't see any table at all
00:17:42
because this
00:17:44
job is still running it will take some
00:17:46
time so basically uh
00:17:50
to speed up
00:17:52
uh the data flow workers it will take
00:17:54
three to four minutes an average so it
00:17:56
has to check the availability of the
00:17:58
worker and then there are some
00:18:02
prerequisites and it will it will check
00:18:04
all those then then it will spin up then
00:18:07
it will process the data as per our
00:18:09
pipeline
00:18:10
right let's go to the data flow
00:18:14
console
00:18:16
right here you can see the right side of
00:18:18
the panel you can see the job
00:18:19
information in detail so
00:18:21
my job type is batch
00:18:24
all right
00:18:25
so
00:18:26
so let's see how to differentiate batch
00:18:28
and streaming right again we'll have to
00:18:29
go to the code
00:18:31
here while defining pipeline right
00:18:34
especially in the pipeline option
00:18:36
there is a parameter called job type if
00:18:39
you don't specify anything it will by
00:18:41
default it will consider that job as a
00:18:43
batch if you specify
00:18:45
job type equal to stream or stream true
00:18:47
something like that then it will
00:18:49
consider that job as a streaming job
00:18:52
so that's why it says bad job running
00:18:55
and sdk version
00:18:57
and which region it is running
00:18:59
right and
00:19:01
how many markers it it's using
00:19:04
right now you can see it has
00:19:06
now
00:19:07
step by step it is turning into green
00:19:09
color so light green means
00:19:12
still running
00:19:13
dark green means it's completed its step
00:19:16
right
00:19:18
you can see now it's trying to write the
00:19:20
data into
00:19:21
bigquery
00:19:22
right it's enough it's in a final
00:19:24
station
00:19:26
right and this is a hardware related
00:19:28
information pertaining to that worker so
00:19:30
how many cpus it is using
00:19:33
right so what is hdd
00:19:35
and
00:19:36
what is the shuffle data
00:19:38
right here we have
00:19:41
job name project region runner data flow
00:19:44
runner right and the staging and temp
00:19:45
location
00:19:46
this actually it is coming over here
00:19:48
based on our input arguments right now
00:19:51
you can see everything is
00:19:53
turned to green that means it is
00:19:55
completed
00:19:56
now i should see some table is created
00:19:58
in my bigquery
00:20:02
data set
00:20:04
right
00:20:07
i should see that data
00:20:13
now you can see this table is created
00:20:15
recently
00:20:17
okay go to the tables see
00:20:22
table size
00:20:24
okay
00:20:25
now you preview you can see
00:20:28
so this is our transformed formatted
00:20:30
data you can see the symbol
00:20:33
and the value
00:20:35
right summed up
00:20:37
so this is how
00:20:40
you can run batch pipelines
00:20:44
in data flow
00:20:46
okay
00:20:46
so if you have any question
00:20:49
you can always come back to me
00:20:51
thank you
00:20:52
thank you very much