Link Search Menu Expand Document

Parsing data

Parsing is done using Apache Beam’s pipeline SDK. Each pipeline is created as a “streaming” pipeline with no bounded source. Data is received via a Kafka connector with a specific topic subscribed to. Pipelines subscribe to a single topic. Each pipeline is created to parse a specific data format from the Kafka topic. The topics are listed below.

Supplier Product topic Store topic
Target target.products target.stores
Walmart walmart.products walmart.stores

The final step of each pipeline takes the output of the previous job and stores it into the MySQL database instance using a JDBC connector.

Specific data parsing

Information for how pipeline parses data can be found on the appropriate page.

Running pipelines

The pipelines are bundled into a “fat JAR” (shadow JAR). This JAR is created using a Gradle plugin, run using gradle(w) shadowJar. In order to properly bundle the JAR, the Flink runner jar must be placed within the MANIFEST so that the Beam SDK can find it. After creating the JAR, it should be copied to the Flink Job Manager container. After being copied, the job should be submitted using the Flink CLI and passing in the class name. (Instructions can be found below). Each pipeline must be submitted as a separate job to the Flink Job Manager. A script has been created for convenience to deploy all the pipelines for both Windows and Unix systems.

Submitting a job

Linux:

$ JOB_CLASS_NAME="shoppinglist.beam.products.pipelines.TargetParse"
$ JM_CONTAINER=$(docker ps --filter name=jobmanager --format="")
$ docker cp beam-pipelines.jar "${JM_CONTAINER}":/beam-pipelines.jar
$ docker exec -t -i "${JM_CONTAINER}" flink run -d -c ${JOB_CLASS_NAME} /beam-pipelines.jar \
--runner=FlinkRunner --bootstrapServers=localhost:9092 --inputTopics=target.products

Windows:

docker ps --filter name=jobmanager --format="" > tmpFile
set /p JM_CONTAINER=< tmpFile
del tmpFile
set JOB_CLASS_NAME="shoppinglist.beam.products.pipelines.TargetParse"
docker cp beam-pipelines.jar %JM_CONTAINER%:/beam-pipelines.jar
docker exec -t -i "%JM_CONTAINER%" flink run -d -c %JOB_CLASS_NAME% /beam-pipelines.jar --runner=FlinkRunner --bootstrapServers=localhost:9092 --inputTopics=target.products

Monitoring pipelines

Because the pipelines are run on Flink, the Flink dashboard displays different metrics about the pipelines. Pipeline metrics can be viewed on the dashboard, located at localhost:8081

Stopping pipelines

There are multiple ways to stop pipelines. In the GUI, you can click on a pipeline run and choose to stop the pipeline. To force all pipelines to shutdown you can shut down the docker containers