Fluent Commerce Logo
Docs
Sign In

Inventory File Loader

How-to Guide

Author:

Fluent Commerce

Changed on:

9 Oct 2023

Key Points

  • The SDK allows to import data into Fluent OMS via batch jobs.
  • The data can be stored in an Amazon S3 bucket and imported into Fluent OMS via API. 
  • The inventory job handler requires various settings that need to be defined in the account’s settings list ( location-pattern, archive-folder... )

Steps

Step arrow right iconInventory File Loader

Overview

In this sample project, we will go through some basic features of the SDK by creating a batch job that can read resources (from a file path, S3 …) and then load to Fluent System.

These are the topics covered by this guide:

  • Spring Batch overview
  • Create a new Batch Job Handler

Prerequisites

  • localstack container is running
  • You have access to a Fluent account

There is a script 

`localstack-setup.sh`
 bundled with the project that can be used to set up localstack. The script requires some parameters to be configured before it can be executed.

It is best to do the following:
First, run this command to open a session with the localstack container

1docker exec -it localstack /bin/bash

Language: java

Name: Command

Description:

[Warning: empty required content area]

Then use the commands below to create the secrets, but ensure to update the variables: $ACCOUNT, $RETAILER, $USERNAME, $PASSWORD and $REGION. Regions values are: sydney, dublin, singapore or north_america.

1docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/api/fluent/activeAccounts --secret-string "{\"accounts\":[{\"name\":\"$ACCOUNT\", \"region\":\"$REGION\", \"retailers\":[$RETAILER]}]}" ;
2docker exec -d localstack awslocal secretsmanager create-secret --name fc/connect/inventory-file-loader/$ACCOUNT/api/fluent-account/$RETAILER --secret-string "{\"retailer\":\"1\", \"userName\":\"$USERNAME\", \"password\":\"$PASSWORD\"}";
3
4docker cp $SAMPLE_FILE localstack:/$SAMPLE_FILE
5docker exec -d localstack awslocal s3api create-bucket --bucket $BUCKET
6docker exec -d localstack awslocal s3api put-object --bucket $BUCKET --key $KEY --body /$SAMPLE_FILE

Language: java

Name: Commands

Description:

[Warning: empty required content area]

Use ctrl + D to exit the localstack session.

Spring Batch overview

Spring Batch is a Java framework for writing and executing multi-step, chunk-oriented batch jobs. It provides reusable functions essential in processing large volumes of records, including logging/tracing, transaction management, job processing statistics, job restart, skip, and resource management.

Architecturally, Spring Batch is divided into three main components:

  • Job: Represents the batch job and contains one or more steps.
  • Step: Represents an individual unit of work within a job, typically defined as a single chunk-oriented task, such as reading data from a database, transforming the data, and writing the results to a file.
  • Processing: These components define the reading, processing, and writing of individual data items within a step
  • Chunk-oriented processing: It is a step-based approach where data is read, processed, and written in small chunks
  • Tasklet-based processing: It is a single-step approach where a single task is executed. Tasklets are typically used for operations such as cleaning up resources or initializing a database. Tasklet-based processing is best suited for scenarios where a single, well-defined task needs to be executed and is not necessarily concerned with large amounts of data.
No alt provided

Create a new Batch Job Handler

Scenario:

  • The e-commerce system generates inventory files in CSV format on an hourly basis, which are stored in the Amazon S3 bucket, "sample-bucket". The object keys for these files are in the format of "samples/inventory-YYYYMMDDHHMMSS.csv".
  • A connector has been implemented to facilitate data transfer from Amazon S3 to Fluent by Batch API. This process consists of the following steps:
  • Downloading the CSV files from Amazon S3 using a reader component.
  • Parsing the data contained within the CSV files
  • Loading/importing the parsed data to Fluent by Batch API using a writer component.
  • Moving the processed files from the original location to an archive location is accomplished through the use of an ArchiveTasklet.

Define Inventory Job Handler:

No alt provided
1@Slf4j
2@Component
3@HandlerInfo(
4        name = "InventoryJobHandler",
5        route = "inventory-job",
6        description = "Sync Inventory from external inventory source file to Fluent platform"
7)
8public class InventoryJobHandler implements JobHandler {
9    public InventoryJobHandler(final BatchJobBuilder<InventoryLocation> batchJobBuilder,
10                               final AmazonS3 amazonS3, final ApplicationContext applicationContext) {
11        this.batchJobBuilder = batchJobBuilder;
12        this.amazonS3 = amazonS3;
13        this.resourcePatternResolver = new PathMatchingSimpleStorageResourcePatternResolver(amazonS3, applicationContext);
14    }
15
16    public void run(@NotNull final JobHandlerContext context) throws JobExecutionException {
17    ...
18    }
19}

Language: java

Name: Example

Description:

[Warning: empty required content area]

The code uses the 

`@HandlerInfo`
 and 
`@HandlerProp`
 annotations to specify the properties of the batch job such as the name, route, description, and properties required for the job to run. The job requires the following properties:

Define a job setting on Fluent OMS

  • Key: fc.connect.inventory-file-loader.batch.inventory-job
No alt provided
  • Value:
1{
2  "props":{
3    "location-pattern":"s3://sample-bucket/samples/inventory-*.csv"
4    "archive-folder":"s3://sample-bucket/archive/"
5    "fields":"locationRef,skuRef,qty"
6    "delimiter":","
7    "skip-line":"1"
8    "encoding":"UTF-8"
9    "chunk-size":"5000"
10    "skip-limit":"1"
11    "concurrency-limit":"5"
12  }
13}

Language: java

Name: Example

Description:

[Warning: empty required content area]
  • location-pattern: The location pattern of the CSV file in S3 (e.g. "s3://sample-bucket/samples/inventory*.csv").
  • archive-folder: The archive folder in S3 where the processed CSV files will be moved to (e.g. "s3://sample-bucket/archive/").
  • fields: The fields in the CSV file that should be processed (e.g. "locationRef,skuRef,qty").
  • delimiter: The delimiter used in the CSV file (e.g. ",").
  • skip-line: The number of lines to skip at the beginning of the CSV file (e.g. 1).
  • encoding: The encoding used in the CSV file (e.g. "UTF-8").
  • chunk-size: The chunk size for processing the data in the CSV file (e.g. 5000). If your 
    `chunk-size`
     is set to 
    `100`
    , then the reader will read 100 records at a time, and the writer will write those 100 records in a single transaction. If the next 100 records are read, they will also be written in a single transaction. This process continues until all records have been processed. If you do not specify a 
    `chunk-size`
    , Spring Batch will use the default value of 
    `10`
    , meaning that 10 records will be processed in a single transaction. It's important to note that specifying a 
    `chunk-size`
     that is too large can cause performance issues and affect the overall efficiency of your batch process. On the other hand, specifying a chunk-size that is too small can cause too many transactions to occur, leading to increased overhead and decreased performance. The optimal 
    `chunk-size`
     depends on the specifics of your application and should be chosen carefully.
  • skip-limit: The number of times to skip an error during processing (e.g. 500).
  • concurrency-limit: The concurrency limit for processing the data in the CSV file (e.g. 5).
No alt provided

Resolve resources:

1Resource[] resources;
2try {
3    resources = this.resourcePatternResolver
4            .getResources(settings.getProp(LOCATION_PATTERN, StringUtils.EMPTY));
5    Arrays.sort(resources, Comparator.comparing(Resource::getFilename));
6} catch (IOException ex) {
7    throw new RuntimeException(ex);
8}

Language: java

Name: Resolve resources:

Description:

[Warning: empty required content area]

This is an example of S3 resources resolver. The order of resources in a 

`Reader`
 can be defined by the order in which the resources are specified in the 
`resources`
 property
In this example, the resources are sorted using the 
`Arrays.sort`
 method and a custom 
`Comparator`
 that defines the order of resources. In this case, the order of resources is determined based on the file name of each resource. The reader will read each resource in the order it appears in the list, processing the records in each resource before moving on to the next one.
Partner can resolve any kind of resources that provided by Spring or custom one (Refer to 6. Resources)

Example:

Resource[] resources = resourcePatternResolver.getResources("file:/foo/bar/**);

Resource[] resources = resourcePatternResolver.getResources("s3:/foo/bar/**);

Resource[] resources = resourcePatternResolver.getResources("http://foo.com/bar/**);

Note: As Fluent's Batch API functions asynchronously, it cannot guarantee the sequence of resource processing, whereby resource 2 may execute before resource 1 is persisted. To ensure data integrity, it is recommended to ensure resource data is unique or to consider processing only one resource at a time.

Define the Reader:

1final var reader = new CsvItemReader<>(fields, delimiter, encoding, skipLine, InventoryLocation.class);

Language: java

Name: Define the Reader:

Description:

[Warning: empty required content area]

`CsvItemReader`
 was built on top of 
`FlatFileItemReader`

Define the Writer:

1final var writer = new FluentInventoryItemWriter(context, response.getId());

Language: java

Name: Define the Writer:

Description:

[Warning: empty required content area]

`FluentInventoryItemWriter`
 was implementation of 
`ItemWriter`

Define the process step:

1final var loadStep = batchJobBuilder.buildProcessStep(LOAD_STEP, resources, reader, writer, chunkSize, skipLimit, concurrencyLimit
2);

Language: java

Name: Define the process step:

Description:

[Warning: empty required content area]

Define a Tasklet to move/archive files to another location on Amazon S3:

1public class S3ArchiveTasklet implements Tasklet {
2
3    private final AmazonS3 amazonS3;
4    private final Resource[] resources;
5    private final String archiveFolder;
6
7    public S3ArchiveTasklet(final AmazonS3 amazonS3, @NotNull Resource[] resources, @NotNull String archiveFolder) {
8        this.amazonS3 = amazonS3;
9        this.resources = resources;
10        this.archiveFolder = archiveFolder;
11    }
12
13    public RepeatStatus execute(final @NotNull StepContribution stepContribution,
14                                final @NotNull ChunkContext chunkContext) {
15        final var archiveFolderURI = new AmazonS3URI(archiveFolder);
16
17        for (Resource resource : resources) {
18            SimpleStorageResource s3Resource = (SimpleStorageResource) resource;
19            final var sourceURI = new AmazonS3URI(s3Resource.getS3Uri());
20            final var destinationKey = archiveFolderURI.getKey() + sourceURI.getKey();
21
22            amazonS3.copyObject(sourceURI.getBucket(), resource.getFilename(), archiveFolderURI.getBucket(), destinationKey);
23            amazonS3.deleteObject(sourceURI.getBucket(), resource.getFilename());
24        }
25
26        return RepeatStatus.FINISHED;
27    }
28}

Language: java

Name: Define a Tasklet to move/archive files to another location on Amazon S3:

Description:

[Warning: empty required content area]

Define the archiveTasklet:

1final var archiveTasklet = new S3ArchiveTasklet(amazonS3, resources, archiveFolder);

Language: java

Name: Define the archiveTasklet:

Description:

[Warning: empty required content area]

Define the archiveStep:

1final var archiveStep = batchJobBuilder.buildCustomStep(ARCHIVE_STEP, archiveTasklet);

Language: java

Name: Define the archiveStep:

Description:

[Warning: empty required content area]

Launch the Batch Job:

1batchJobBuilder.launchJob(UUID.randomUUID().toString(), JOB_NAME, loadStep, archiveStep);

Language: java

Name: Launch the Batch Job:

Description:

[Warning: empty required content area]

Parameters:

  • id: id of batch job. By default, Spring Batch provide ability to indicate a job instance with the same parameters has already been executed or completed.
  • name: name of batch job
  • steps: accept a variable number of steps and execute steps follow the order. Examples:
    • batchJobBuilder.launchJob(id, name, prepareSteploadSteparchiveStep): execute 3 steps by the order: prepareStep > loadStep > archiveStep
    • batchJobBuilder.launchJob(id, name, loadSteparchiveStep): execute 2 steps by the order: loadStep > archiveStep
    • batchJobBuilder.launchJob(id, name, loadStep): execute only 1 step

Trigger Job

No alt provided
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/<ACCOUNT>/<RETAILER>/inventory-job

Language: java

Name: Code

Description:

[Warning: empty required content area]
1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/34/inventory-job

Language: java

Name: Example of the CURL with the replaced values:

Description:

[Warning: empty required content area]
Fluent Commerce

Fluent Commerce

Copyright © 2024 Fluent Retail Pty Ltd (trading as Fluent Commerce). All rights reserved. No materials on this docs.fluentcommerce.com site may be used in any way and/or for any purpose without prior written authorisation from Fluent Commerce. Current customers and partners shall use these materials strictly in accordance with the terms and conditions of their written agreements with Fluent Commerce or its affiliates.

Fluent Logo