Fluent Commerce Logo
Docs
Sign In

Weather Updates

How-to Guide

Author:

Fluent Commerce

Changed on:

20 Nov 2023

Key Points

  • In this guide, we will go through some basic features of the SDK by creating a connector from scratch that can receive or retrieve weather updates and push them to a Fluent setting. 
  • If you wish to bypass this guide and look at the result, feel free to download the sources.

Steps

Step arrow right iconOverview

These are the topics covered by this guide:

  • Creating a new connector project
  • Adding a new SQS Queue listener
  • How to map an external system to a Fluent retailer
  • How to process a message received
  • How to determine the correct message route
  • Receiving and processing messages from the SQS listener
  • How to read a property from Fluent
  • Code generation from GraphQL files
  • How to upsert (create or update) a setting in Fluent
  • Adding a job and executing it
  • Opening an HTTP endpoint that can receive inputs and queue them to be processed

Security is not covered here, especially when opening a new endpoint. The determination of the Fluent account has also been simplified for this demo.

If you wish to bypass this guide and look at the end result, feel free to download the sources.

Step arrow right iconPrerequisites

  • localstack container is running
  • You have access to a Fluent account

There is a script 

`localstack-setup.sh`
 bundled with the project that can be used to set up localstack. The script requires some parameters to be configured before it can be executed.

It is best to do the following:
First, run this command to open a session with the localstack container

1docker exec -it localstack /bin/bash

Language: java

Name: Command

Description:

[Warning: empty required content area]

Then use the commands below to create the secrets, but ensure to update the variables: $ACCOUNT, $RETAILER, $USERNAME, $PASSWORD and $REGION. Regions values are: sydney, dublin, singapore or north_america.

1awslocal secretsmanager create-secret --name fc/connect/weather-update-demo/api/fluent/activeAccounts --secret-string "{\"accounts\":[{\"name\":\"$ACCOUNT\", \"\region": \"$REGION\", \"retailers\":[$RETAILER]}]}" ;
2awslocal secretsmanager create-secret --name fc/connect/weather-update-demo/$ACCOUNT/api/fluent-account/$RETAILER --secret-string "{\"retailer\":\"1\", \"userName\":\"$USERNAME\", \"password\":\"$PASSWORD\"}";

Language: java

Name: Command

Description:

[Warning: empty required content area]

Use ctrl + D to exit the localstack session.

Step arrow right iconCreating the project

Follow the SDK guide to create a new project - Creating a new connector with the SDK

Step arrow right iconSetting up the IDE

Open the maven project in your preferred IDE and create 

`docker-compose-localstack.yml`
 file in order to start localstack.

1version: "3.9"
2services:
3  localstack:
4    container_name: localstack
5    image: localstack/localstack
6    restart: unless-stopped
7    ports:
8      - "4510-4559:4510-4559"  # external service port range
9      - "4566:4566"
10    environment:
11      - AWS_DEFAULT_REGION=us-east-1
12      - SERVICES=sqs,secretsmanager
13      - HOSTNAME=localstack
14      - HOSTNAME_EXTERNAL=localstack
15    volumes:
16      - ./docker/localstack:/docker-entrypoint-initaws.d

Language: java

Name: Example

Description:

[Warning: empty required content area]

With the IDE open the localstack running, follow the SDK setup steps.

The essential steps are now complete and so lets start working on the weather demo project. Add the variable below to the "Run Configuration" environment variables, as shown in the screenshot. 

`SQS_DEMO`
 contains the actual name of an SQS queue that we will use on this sample project.

1SQS_DEMO=demo

Language: java

Name: Command

Description:

[Warning: empty required content area]
No alt provided

All initial required steps have been configured, and it is possible to start the application.

Step arrow right iconAdding a new queue listener

It's time to start coding….

Queue configuration

The first step in creating a listener is creating a configuration, as shown below. See SDK configuration steps for further details. There are two files in the screenshot below, and these are two different versions of the 

`application.yaml`
 with a spring profile at the end of it: 
`-dev`
 and 
`-connector`
. Although it is possible to achieve the same by just having one, this is to illustrate that a queue can be bound to different Fluent accounts for different deployment environments.

These are the key properties to create:

  • `demo-weather`
    : the listener ID, and it must be unique. It is important to keep the id 
    `demo-weather`
     as it will be referenced elsewhere in this sample.
  • `name`
    : Contains the environment name that can return the actual queue name. The purpose of having it as an environment variable is so that during the deployment of a container, it can receive as a parameter the queue name allowing different queue names between different deployment environments.
  • `type`
    : it has to be sqs as this project uses the 
    `connect-sdk-core-aws`
     as an example.
  • `fluent-account`
    : the SDK requires that a queue is bound to a Fluent account.
No alt provided

Queue listener

Let's create some java classes with the listener configuration out of the way. Create a new package 

`custom`
 under 
`com.fluentcommerce.connect`
 and let's use this custom package for classes of our sample/demo project.

Queue Message

WeatherData will be the only data type (DTO) we will use in the demo.

1package com.fluentcommerce.connect.custom;
2
3/**
4 * 
5 * @param id name of the event/message
6 * @param code weather code identification
7 * @param temperature the weather temperature
8 * @param description weather description
9 */
10public record WeatherData(String id, String code, String temperature, String description) {}

Language: java

Name: queue Message sample code

Description:

[Warning: empty required content area]
Queue listener

`DemoQueueListener`
 is the component that will subscribe to the SQS queue and receive the messages. This class doesn't process the message, it instead validates that a route can be found to process it and determines the retailer this message will be processed with. Note that we will only consider working with one retailer in this example.

The class below uses a fixed value for 

`SAMPLE_FIXED_ID`
 and 
`EXTERNAL_ID`
 for the Fluent account mapping. This is the process where a message received can be for one or another retailer of the same account, and based on parameters received on the message, the SDK will find the correct retailer to use. Again, we are working with a single retailer in this example, and the message received doesn't carry that information.

In the code example below, there is a method 

`getMessageRoute`
 that returns a message route based on the information received in the message. The configuration will be covered on the following topic: the 
`id`
 property received on the message needs to match the configuration property 
`name`
.

1package com.fluentcommerce.connect.custom;
2
3import com.fasterxml.jackson.core.JsonProcessingException;
4import com.fluentcommerce.connect.core.config.data.RouterMappingSettings;
5import com.fluentcommerce.connect.core.configuration.data.AccountReference;
6import com.fluentcommerce.connect.core.exception.UnprocessableMessageException;
7import com.fluentcommerce.connect.core.listener.data.ConnectSDKMessage;
8import com.fluentcommerce.connect.core.listener.data.ExternalMessage;
9import com.fluentcommerce.connect.core.listener.external.BaseExternalListener;
10import com.fluentcommerce.connect.core.listener.external.ListenerInfo;
11import lombok.extern.slf4j.Slf4j;
12import org.apache.commons.lang3.StringUtils;
13import org.jetbrains.annotations.NotNull;
14import org.springframework.stereotype.Component;
15
16import java.util.Collections;
17import java.util.Optional;
18import java.util.UUID;
19
20@Slf4j
21@Component
22@ListenerInfo(id = "demo-weather", messageClass = WeatherData.class)
23public class DemoQueueListener extends BaseExternalListener<WeatherData> {
24
25    //These values identify your external system and are used to find a Fluent account-retailer during the account map lookup
26    //The queue is fixed to a Fluent account (as defined at application-dev.yml), but the retailer is flexible
27    private final static String SAMPLE_FIXED_ID = "fluent-ct-dev-2";
28    private final static String EXTERNAL_ID = "projectKey";
29
30    @Override
31    public @NotNull Optional<ConnectSDKMessage> receiveMessage(@NotNull final ExternalMessage message) throws UnprocessableMessageException {
32        final WeatherData weatherData = getMessageContent(message);
33        final Optional<AccountReference> accountReference = getFluentAccountReference(Collections.singletonMap(EXTERNAL_ID, SAMPLE_FIXED_ID));
34        final Optional<String> messageRoute = getMessageRoute(weatherData);
35
36        if (accountReference.isPresent() && messageRoute.isPresent()) {
37            try {
38                return Optional.of(ConnectSDKMessage.builder()
39                        .id(UUID.randomUUID())
40                        .name(messageRoute.get())
41                        .accountId(accountReference.get().accountId())
42                        .retailerId(accountReference.get().retailerId())
43                        .payload(ConnectSDKMessage.toJson(weatherData))
44                        .build());
45            } catch (final JsonProcessingException e) {
46                throw new UnprocessableMessageException(e);
47            }
48        }else {
49            log.warn("Unable to determine account [{}] or route [{}].", accountReference, messageRoute);
50        }
51        return Optional.empty();
52    }
53
54    /**
55     * Finds a message route based on the name property as defined by the route mappings at application-connector.yml
56     */
57    private Optional<String> getMessageRoute(final WeatherData weatherData) {
58        return getRouteMappings().stream()
59                .filter(route -> weatherData.id().equalsIgnoreCase(route.getProp("id", StringUtils.EMPTY)))
60                .findFirst().map(RouterMappingSettings::getRoute);
61    }
62}

Language: java

Name: Example

Description:

[Warning: empty required content area]

For the Fluent account mapping to work, we need to configure a new setting at the retailer used during the localstack credential setup process. The setting key has to follow the pattern: 

`fc.connect.<connector-name>.account-mapping`
. The value of 
`connector-name`
 is the name of the connector given when creating the project, and it can also be found at 
`application-connector.yml`
.

Configuration Template:
Setting name at Retailer context on Fluent setting : 

`fc.connect.<connectname>.account-mapping`

1{
2    "fluent": {
3        "accountId": "<FLUENT_ACCOUNT>",
4        "retailerId": "<RETAILER_ID>"
5    },
6    "externalAccount": {
7        "<EXTERNAL_KEY>": "<EXTERNAL_VALUE>"
8    }
9}

Language: json

Name: Configuration Template:

Description:

[Warning: empty required content area]

For our demo, please set the external account as shown below:

No alt provided

The last step is to configure a route mapping. This feature allows your listener to choose which route to use for the incoming message based on the mapping defined. This is useful when your listener receives different types of messages and each type should be processed differently as they are different in nature. For example, your listener receives both weather update and weather warning update. By setting a routing table (aka route mapping) to forward these 2 messages to 2 different handlers, you can have give each handler a very specific task to do and this helps the maintenance, evolution as well as re-usability of your handlers

No alt provided
Queue Message Handler

When working with message handlers, the route to a handler is specified by an annotation HandlerInfo: 

`route =`
 
`fc.connect.sample-weather-update`
. The purpose of a route is to tell the SDK to forward any messages with given route to a particular handler so your logic can be applied to process the message.

1package com.fluentcommerce.connect.custom;
2
3import com.fluentcommerce.connect.core.exception.HandlerRetryException;
4import com.fluentcommerce.connect.core.exception.UnprocessableMessageException;
5import com.fluentcommerce.connect.core.handler.HandlerInfo;
6import com.fluentcommerce.connect.core.handler.context.MessageHandlerContext;
7import com.fluentcommerce.connect.core.listener.router.MessageHandler;
8import lombok.extern.slf4j.Slf4j;
9import org.jetbrains.annotations.NotNull;
10import org.springframework.stereotype.Component;
11
12import java.util.Optional;
13
14@Slf4j
15@Component
16@HandlerInfo(name = "location-weather-update", route = "fc.connect.sample-weather-update-location")
17public class WeatherDataMessageHandler implements MessageHandler {
18    @Override
19    public void processMessage(@NotNull final MessageHandlerContext context) throws UnprocessableMessageException, HandlerRetryException {
20        UpsertSettingUtils.upsertSetting(getPayload(context, WeatherData.class), context);
21    }
22
23    protected <T> T getPayload(final MessageHandlerContext context, final Class<T> messageClass) {
24        final Optional<T> payload = context.getMessagePayload(messageClass);
25        if (payload.isPresent()) {
26            return payload.get();
27        }
28        throw new UnprocessableMessageException("Unable to get message payload");
29    }
30}

Language: java

Name: Example

Description:

[Warning: empty required content area]

Utility class

UpsertSettingUtils class is used by both the message handler and job in our example to connect to Fluent and upsert a setting. The setting name is hardcoded below a 

`FLUENT_SETTING_KEY = "fc.connect.weather-update-demo.current-weather"`
.

1package com.fluentcommerce.connect.custom;
2
3import com.fasterxml.jackson.core.JsonProcessingException;
4import com.fluentcommerce.connect.core.exception.UnprocessableMessageException;
5import com.fluentcommerce.connect.core.handler.context.HandlerContext;
6import com.fluentcommerce.connect.core.utils.ConversionUtils;
7import com.fluentcommerce.graphql.queries.settings.CreateSettingMutation;
8import com.fluentcommerce.graphql.queries.settings.UpdateSettingMutation;
9import com.fluentcommerce.graphql.type.CreateSettingInput;
10import com.fluentcommerce.graphql.type.UpdateSettingInput;
11import com.fluentcommerce.util.core.SettingUtils;
12import com.fluentretail.api.exception.FluentApiException;
13import org.apache.commons.lang3.StringUtils;
14import org.jetbrains.annotations.NotNull;
15
16import java.util.Collections;
17import java.util.Map;
18
19public class UpsertSettingUtils {
20    private static final String FLUENT_SETTING_KEY = "fc.connect.weather-update-demo.current-weather";
21
22    private UpsertSettingUtils() {
23    }
24
25    public static void upsertSetting(final WeatherData weatherData, final HandlerContext context) {
26        final var settings = SettingUtils.getSettings(context.ofFluentContext(), Collections.singletonMap(FLUENT_SETTING_KEY, FLUENT_SETTING_KEY));
27        if (settings.containsKey(FLUENT_SETTING_KEY) && StringUtils.isNotBlank(settings.get(FLUENT_SETTING_KEY).getId())) {
28            update(settings.get(FLUENT_SETTING_KEY).getId(), weatherData, context);
29        } else {
30            create(weatherData, context);
31        }
32    }
33
34    private static void create(final WeatherData weatherData, final HandlerContext context) {
35        final CreateSettingMutation mutation;
36        try {
37            mutation = CreateSettingMutation.builder()
38                    .input(CreateSettingInput.builder()
39                            .name(FLUENT_SETTING_KEY)
40                            .valueType("JSON")
41                            .lobValue(convertObject(ConversionUtils.getObjectMapper().writeValueAsString(weatherData)))
42                            .context("RETAILER")
43                            .contextId(Integer.parseInt(context.getAccountReference().retailerId()))
44                            .build())
45                    .build();
46        } catch (JsonProcessingException e) {
47            throw new UnprocessableMessageException(e);
48        }
49        final CreateSettingMutation.Data result = context.ofFluentContext().executeMutation(mutation, CreateSettingMutation.Data.class);
50        if (result == null || result.createSetting() == null || StringUtils.isBlank(result.createSetting().id())) {
51            throw new FluentApiException(0, "Fluent mutation failed.");
52        }
53    }
54
55    private static void update(final String id, final WeatherData weatherData, final HandlerContext context) {
56        final UpdateSettingMutation mutation;
57        try {
58            mutation = UpdateSettingMutation.builder()
59                    .input(UpdateSettingInput.builder()
60                            .id(id)
61                            .name(FLUENT_SETTING_KEY)
62                            .valueType("JSON")
63                            .lobValue(convertObject(ConversionUtils.getObjectMapper().writeValueAsString(weatherData)))
64                            .context("RETAILER")
65                            .contextId(Integer.parseInt(context.getAccountReference().retailerId()))
66                            .build())
67                    .build();
68        } catch (JsonProcessingException e) {
69            throw new UnprocessableMessageException(e);
70        }
71        final UpdateSettingMutation.Data result = context.ofFluentContext().executeMutation(mutation, UpdateSettingMutation.Data.class);
72        if (result == null || result.updateSetting() == null) {
73            throw new FluentApiException(0, "Fluent mutation failed.");
74        }
75    }
76
77    private static Map convertObject(@NotNull final String value) throws JsonProcessingException {
78        return ConversionUtils.getObjectMapper().readValue(value, Map.class);
79    }
80}

Language: java

Name: Example

Description:

[Warning: empty required content area]

This class makes GraphQL mutation calls to Fluent, so let's create some GraphQL files and have the SDK generate their stubs. You should be able to find a 

`graphql`
 folder under the main that will contain the GraphQL schema (schema.json). The other two files highlighted need to be manually created for our demo as follows:

No alt provided

GraphQL

  • CreateSetting.graphql
1mutation CreateSetting($input:CreateSettingInput!){
2    createSetting(input: $input){
3        id
4        name
5    }
6}

Language: graphqlschema

Name: CreateSetting.graphql

Description:

[Warning: empty required content area]
  • UpdateSetting.graphql
1mutation updateSetting($input:UpdateSettingInput!){
2    updateSetting(input: $input){
3        id
4    }
5}

Language: graphqlschema

Name: UpdateSetting.graphql

Description:

[Warning: empty required content area]

Before running maven compile again, it is necessary first to uncomment, a plugin called 

`apollo-client-maven-plugin`
 in the 
`pom.xml`
. This is disabled by default, as the build would fail without any GraphQL queries to generate code. Now let's run maven compile the code generation will create the necessary stub,s and the code should compile and resolve all errors.

Step arrow right iconTesting the queue

It is now possible to start the application and test the queue. When the application starts, all necessary queues will be auto-created by the SDK and immediately start listening for any new messages. To publish a message to the demo connector, run the command below. The message below simulates sending a weather update to the connector, which will update Fluent with a said weather update.

1docker exec -d localstack awslocal --endpoint-url=http://localhost:4566 sqs send-message --queue-url http://localstack:4566/000000000000/demo --message-body "{   \"id\": \"weather-update-message\", \"code\": \"300\",  \"temperature\": \"22\",  \"description\": \"Sunny Day\"}" ;

Language: java

Name: Command

Description:

[Warning: empty required content area]

To validate that it works, check the logs of the demo connector and Fluent settings by searching for 

`fc.connect.weather-update-demo.current-weather`
. This should contain the same weather update as the message above.

Step arrow right iconAdding a job handler

The next step is to add a job handler pretending to retrieve weather updates from external systems. In this example, return a random update from a pre-defined list.

1package com.fluentcommerce.connect.custom;
2
3import com.fluentcommerce.connect.core.exception.JobExecutionException;
4import com.fluentcommerce.connect.core.handler.HandlerInfo;
5import com.fluentcommerce.connect.core.handler.context.JobHandlerContext;
6import com.fluentcommerce.connect.core.job.extend.JobHandler;
7import lombok.extern.slf4j.Slf4j;
8import org.jetbrains.annotations.NotNull;
9import org.springframework.stereotype.Component;
10
11import java.util.List;
12import java.util.Random;
13
14@Slf4j
15@Component
16@HandlerInfo(name = "weather-update-job", route = "weather-update-job")
17public class WeatherUpdateJobHandler implements JobHandler {
18    @Override
19    public void run(@NotNull final JobHandlerContext context) throws JobExecutionException {
20        UpsertSettingUtils.upsertSetting(getWeatherReport(), context);
21    }
22
23    private WeatherData getWeatherReport() {
24        final var random = new Random();
25        return weatherCodeMap.get(random.nextInt(weatherCodeMap.size()));
26    }
27
28    private static final List<WeatherData> weatherCodeMap = List.of(
29            new WeatherData("weather-update", "201", "12", "Thunderstorm with rain"),
30            new WeatherData("weather-update", "301", "12", "Drizzle"),
31            new WeatherData("weather-update", "500", "19", "Light Rain"),
32            new WeatherData("weather-update", "502", "24", "Heavy Rain"),
33            new WeatherData("weather-update", "521", "23", "Shower rain"),
34            new WeatherData("weather-update", "600", "14", "Light snow"),
35            new WeatherData("weather-update", "601", "15", "Snow"),
36            new WeatherData("weather-update", "602", "16", "Heavy Snow"),
37            new WeatherData("weather-update", "711", "15", "Smoke"),
38            new WeatherData("weather-update", "741", "21", "Fog"),
39            new WeatherData("weather-update", "800", "28", "Clear sky"),
40            new WeatherData("weather-update", "801", "25", "Few clouds"),
41            new WeatherData("weather-update", "802", "23", "Scattered clouds"),
42            new WeatherData("weather-update", "803", "21", "Broken clouds"),
43            new WeatherData("weather-update", "804", "20", "Overcast clouds"));
44}

Language: java

Name: Example

Description:

[Warning: empty required content area]

The SDK doesn't have an internal scheduler, so it relies on external systems to trigger the job/batch execution. In AWS deployments, we recommend using EventBridge, for example. EventBridge will call a given URL when the clock ticks. It’s time to run the job. To simulate such behavior, just run the command below but ensure to update the URL with the account and retailer first. Please use the account and retailer as configured in 

`localstack-setup.sh.`

1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/<ACCOUNT>/<RETAILER>/weather-update-job

Language: java

Name: Command

Description:

[Warning: empty required content area]

Example of the CURL with the replaced values:

1curl -X PUT http://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/34/weather-update-job

Language: java

Name: Example

Description:

[Warning: empty required content area]

Step arrow right iconAdding a new HTTP endpoint

Another way to receive updates from external systems is to open an HTTP endpoint. The example below receives the same payload we used in the queue example.

This is a straightforward example without authentication or authorization, with hardcoded values as the Fluent account and message route. We recommend securing any endpoint exposed to the WEB with authentication and authorization.

1package com.fluentcommerce.connect.custom;
2
3import com.fasterxml.jackson.core.JsonProcessingException;
4import com.fluentcommerce.connect.core.configuration.data.AccountReference;
5import com.fluentcommerce.connect.core.listener.data.ConnectSDKMessage;
6import com.fluentcommerce.connect.core.listener.publisher.ListenerPublisherService;
7import com.fluentcommerce.connect.custom.data.WeatherData;
8import org.springframework.beans.factory.annotation.Autowired;
9import org.springframework.beans.factory.annotation.Value;
10import org.springframework.http.ResponseEntity;
11import org.springframework.web.bind.annotation.RequestBody;
12import org.springframework.web.bind.annotation.RequestMapping;
13import org.springframework.web.bind.annotation.RequestMethod;
14import org.springframework.web.bind.annotation.RestController;
15
16import java.util.UUID;
17
18@RestController
19@RequestMapping("/api/v1/fluent-connect/weather")
20public class WeatherUpdateController {
21
22
23    private final ListenerPublisherService publishService;
24
25    @Value("${account}")
26    private String account;
27    @Value("${retailer}")
28    private String retailer;
29
30    @Autowired
31    public WeatherUpdateController(final ListenerPublisherService publishService) {
32        this.publishService = publishService;
33    }
34
35    @RequestMapping(value = "/update", method = RequestMethod.POST, consumes = "application/json", produces = "application/json")
36    public ResponseEntity<?> updateWeather(@RequestBody WeatherData weatherData) {
37        final var accountReference = AccountReference.of(account, retailer);
38
39        try {
40            publishService.publishMessage(ConnectSDKMessage.builder()
41                    .id(UUID.randomUUID().toString())
42                    .name("fc.connect.sample-weather-update")
43                    .accountId(accountReference.accountId())
44                    .retailerId(accountReference.retailerId())
45                    .payload(ConnectSDKMessage.toJson(weatherData))
46                    .build());
47        } catch (final JsonProcessingException e) {
48            return ResponseEntity.internalServerError().build();
49        }
50        return ResponseEntity.ok().build();
51    }
52}

Language: java

Name: Example

Description:

[Warning: empty required content area]

The controller above expects a configuration to exist at 

`application-connector.yml`
 to hold the account and retailer. This should match the ones used at - 
`localstack-setup.sh`
. Here is an example:

1#Fluent Account setup for the demo - only here for the demo
2account: "CNCTDEV"
3retailer: 34

Language: java

Name: Sample

Description:

[Warning: empty required content area]

To test the HTTP endpoint, run the command below:

1curl -X POST http://localhost:8080/api/v1/fluent-connect/weather/update -H "Content-Type:application/json" -d "{   \"id\": \"weather-update-message\", \"code\": \"300\",  \"temperature\": \"22\",  \"description\": \"Sunny Day\"}"

Language: java

Name: Command

Description:

[Warning: empty required content area]
Fluent Commerce

Fluent Commerce

Copyright © 2024 Fluent Retail Pty Ltd (trading as Fluent Commerce). All rights reserved. No materials on this docs.fluentcommerce.com site may be used in any way and/or for any purpose without prior written authorisation from Fluent Commerce. Current customers and partners shall use these materials strictly in accordance with the terms and conditions of their written agreements with Fluent Commerce or its affiliates.

Fluent Logo