Fluent Connector Configuration
Author:
Fluent Commerce
Changed on:
20 Nov 2023
Overview
The Connect SDK expects a minimum of 4 configuration files:
- application-connector.yml - Project and Connector specific configuration.
- application.yml - Core Connect SDK configuration. These are mostly default configurations and core settings.
- bootstrap.yml - Server/Container configuration. This file includes settings for all different environments and - follows the spring profile standards.
- logback-spring.xml - Log configuration also supports different configurations based on spring profiles.
It is possible to have dedicated configuration files for each environment, often used to override a common setting for a particular environment. To create such a file, follow Spring Boot's format having the additional desired spring profile name at the end of the application.yml file, for example, application-dev.yml.
Key points
- Credential / Secrets Configuration
- Configuration Service
- Listener Configuration
- Job Scheduler Configuration
- Cache Configuration
- Product Availability
- API Key Configuration
- Spring actuators
Credential / Secrets Configuration
For the Connect SDK to interact with external systems, it requires access to their credentials, which must be stored in a secured location. The SDK provides a way for new credential storage services to be added, which is covered in the customisation guide. The example below uses a credential service from the connect-sdk-core-aws module, and to configure your credentials on your development environment, follow the Credential / Secret Setup Guide.
1fluent-connect:
2 credential:
3 credential-manager: "aws-secrets-manager"
Language: java
Name: Code
Description:
[Warning: empty required content area]Property | Required | Description | Default |
credential-manager | yes | ID of the credential service used by the SDK. | n/a |
Basic SDK Secret Configuration
- Active accounts This is used to tell the SDK what region the account is on Fluent and what retailers the Connect SDK will consider when processing messages.
Available regions:
- sydney
- dublin
- singapore
- north_america
Secret name:
`fc/connect/{connector-name}/api/fluent/activeAccounts`
Sample Configuration:
1{
2"accounts": [
3 {
4 "name": "CNCTDEV",
5 "region":"sydney",
6 "retailers": [
7 1,
8 34,
9 67,
10 100
11 ]
12 }
13]
14}
Language: json
Name: Example
Description:
[Warning: empty required content area]- Fluent Credentials
These are the credentials the SDK uses to make requests to Fluent while pushing or pulling information from Fluent. When using listeners in the SDK, this configuration is mandatory. Having as many entries as necessary to cover all the retailers using the SDK is possible.
Secret name:
`fc/connect/{connector-name}/{your-fluent-account}/api/fluent-account/1`
Sample Configuration:
1{
2 "retailer": "1",
3 "userName": "<FLUENT-RETAILER-USERNAME>",
4 "password": "<FLUENT-RETAILER-PASSWORD>"
5}
Language: json
Name: Example
Description:
[Warning: empty required content area]Custom Secrets
It is possible to store any secret at the secret manager and have the SDK use them. All secret names must adhere to the SDK naming convention; otherwise, they will be discarded and not used. All secrets must be prefixed with:
`fc/connect/{connector-name}/`
`fc/connect/{connector-name}/my-secret`
Configuration Service
The SDK uses a configuration service to read runtime configuration/settings. The default implementation uses Fluent settings as the storage location, but adding a new service to manage the runtime configuration/settings is possible.
1fluent-connect:
2 configuration:
3 settings-manager: "fluent-settings"
Language: java
Name: Code
Description:
[Warning: empty required content area]Property | Required | Description | Default |
settings-manager | yes | ID of the configuration service used by the SDK. The default setting uses Fluent Settings as the storage location. |
|
General Configuration
Connect SDK provides the following configuration.
Property | Required | Description | Default |
connector-name | yes | This is the connector's name and this property is used to both name the connector and build the path of other configuration keys. Keep characters limited to 'a-z' and '-'. | n/a |
event-log-manager | no | Id of the service that will log SDK events. There are two pre-defined options:
|
|
enable-listeners | no | Enable or disable all SDK listeners | true |
disable-listeners | no | A list of handlers will be disabled. The routes that use these disabled handlers will be disabled as well
| empty |
1connector-name: <my-connector-name>
2event-log-manager: "fluent-eventlog"
Language: java
Name: Sample configuration template:
Description:
[Warning: empty required content area]The table below describes the default configuration keys provided by the SDK. These are used when fetching configuration (fluent settings). Note that altering these may affect the behavior of the SDK.
Property | Description | Default |
base-path-key | Controls the base key path of all configurations of the SDK. Nothing outside the path is read by the SDK.
|
|
account-mapping-key | Maps a Fluent account-retailer to an external system. The identification of the external system must be unique, and it is possible to be a composite value. |
|
general-key | Any general settings that don't quite fit in any category. |
|
log-level-key | Global level setting key to controlling the SDK's log level (DEBUG, INFO, WARN, ERROR). Changes to this property will take effect on the first configuration refresh in the SDK. |
|
unit-conversion-key | Used for unit conversion configuration, for example, Kilogram to Gram. |
|
Configuration Samples
1{
2 "fluent": {
3 "accountId": "FCTDEV",
4 "retailerId": "100"
5 },
6 "externalAccount": {
7 "projectKey": "my-project-qa",
8 "uid": "123"
9 }
10}
Language: json
Name: Account Mapping (account-mapping-key)
Description:
[Warning: empty required content area]1{
2 "productCatalogue": {
3 "entityRef": "PC:MASTER:100",
4 "entitySubType": "MASTER"
5 }
6}
Language: json
Name: General Settings (general-key)
Description:
[Warning: empty required content area]1{
2 "conversionSettings": [
3 {
4 "fromUnit": "cm",
5 "toUnit": "mm",
6 "conversionFactor": 10
7 },
8 {
9 "fromUnit": "kg",
10 "toUnit": "gm",
11 "conversionFactor": 1000
12 }
13 ]
14}
Language: json
Name: Unit Conversion (unit-conversion-key)
Description:
[Warning: empty required content area]1{
2 "fluent": {
3 "PROCESSING": "Initial",
4 "AWAITING_WAVE": "Initial",
5 "ASSIGNED": "picking",
6 "PARTIALLY_FULFILLED": "picking",
7 "FULFILLED": "picking",
8 "AWAITING_COURIER_COLLECTION": "readyToShip",
9 "AWAITING_CUSTOMER_COLLECTION": "readyToShip",
10 "COMPLETE": "shipped",
11 "ESCALATED": "Initial",
12 "REJECTED": "backorder",
13 "CANCELLED": "Cancelled"
14 }
15}
Language: json
Name: Entity level configuration, for example, fulfilment - entity.fulfilment.status-mapping
Description:
[Warning: empty required content area]Listener Configuration
Listeners allow connectors to subscribe to queues/topics which is enabled by default. To disable all listeners, including the SDK internal listeners, set the following property
`enable-listeners: false`
1listeners:
2 # listener configuration block
3 listener-id:
4 name: "ENV_VARIABLE"
5 fluent-account: FLUENT_ACCOUNT
6 type: "sqs"
7 core-pool-size: 5
8 pool-size: 20
9 prefix: "my-prefix"
10 shutdown-timeout: 300
11 visibility-timeout: 120
12 poll-wait: 20
13 dlq-suffix: "-dlq"
14 retry: 5
Language: java
Name: listeners
Description:
[Warning: empty required content area]Property | Required | Description | Default value |
listener-id | yes | Actual id of the listener. This must match the id defined at the class when using @ListenerInfo. | n/a |
name | yes | Name of the environment variable that will contain the real queue name. | Empty String |
type | yes | Type of listener. This is based on a list of supported listeners of the Connect SDK.
Notes: The required SDK library must be present to use different listener types. | NOT_DEFINED |
fluent-account | required for custom queues (aka external queues) | Fluent account name. This is required and restricts the processing of messages to services and configuration for said account. | n/a |
core-pool-size | no | Minimal Number of threads available to process messages from the queue. Determines the minimal number of threads available to process messages | 10 |
pool-size | no | Number of threads available to process messages from the queue. Determines how many messages can be processed in parallel | 200 |
prefix | no | Prefix given to the threads processing messages. | default |
shutdown-timeout | no | Wait time in seconds to allow in-flight threads to gracefully finish processing before forcing them to quit | 300 |
visibility-timeout | no | Queue visibility timeout in seconds - Messages read from the queue and kept invisible to other listeners while the node that took the message processes it. If the message is not deleted before the timeout the message is made available again for processing - retry in case of errors. | 120 |
poll-wait | no | Listener wait time in seconds to read as many items from the queue as possible. | 10 |
dlq-suffix | no | Dead letter queue suffix. DLQ is a generated name from the queue name with the suffix appended to the end. |
|
retry | no | Queue retry policy. How often is the message put for retry on the queue before it goes to DLQ. | 2 |
max-allowed-pull-size | no | Max number of allowed messages to pull from the queue. | 10 |
props | no | Additional listener configuration. Connect SDK will read all configuration set under props and have them stored as a Map.
| n/a |
Kafka Listener Configuration
Kafka is a distributed message broker that can be utilised as a messaging queue for SDK listeners. The SDK provides a seamless experience for listeners to subscribe to different message brokers through configuration if the correct SDK library is available.
To configure the Kafka brokers with the SDK, use an environment variable
`KAFKA_BROKERS`
`localhost:9092`
Every listener requires at least two topics, one as the main topic to receive the incoming messages and another as a dead letter queue for failed messages. If the retry policy is configured, then additional topics are required to the amount of the retry specified. The SDK expects the following naming convention for the topics, which the Kafka administrator must create, as the SDK will not automatically create them.
Topic types | Listener Configuration sample | Expected topic names |
Main topic | name = notification | notification |
DLQ topic | dlq-suffix = -dlq | notification-dlq |
Retry topics | retry = 2 | notification-retry-1notification-retry-2 |
Key different between SQS and Kafka
- Both listeners work with a thread pool, but the main difference is that SQS constantly pulls messages from the queue and processes them if threads are idle in the pool. On the other hand, Kafka works with batches, reads X messages from the topic, and is only capable of reading more messages once the first batch has completed processing.
- Kafka requires additional topics to support its retry policy
- Messages processed from SQS are removed from the queue, whereas in Kafka, they remain there after processing. Note that Kafka is not an event store, and selectively reprocessing old messages is not possible.
Retry mechanism:
Kafka doesn’t have a retry mechanism inbuilt like AWS SQS, and to achieve a similar behavior, SDK implements its own retry policy by using additional topics. The diagram below illustrates how messages can be forwarded to the different topics of a listener.
Each Kafka listener configured with a retry policy will work with a pair of listeners. The first listener is dedicated to processing messages from the main topic, while the second listener will subscribe to all retry topics. The main topic listener will never stop processing messages. The retry listener will regularly pause the subscriptions to allow the retry interval time to elapse and resume when the time is due to process the messages waiting for a retry.
Job Scheduler Configuration
Connect SDK doesn't have an internal scheduler, and it depends on external systems to trigger an HTTP request for a job to be executed as described below.
1POST https://<domain>/api/v1/fluent-connect/scheduler/add/<fluent-account>/<fluent-retailer>/<job-name>
Language: text
Name: POST
Description:
[Warning: empty required content area]However, it is possible to extend the SDK and add different types of schedulers. The module
`connect-sdk-core-aws`
`message-queue`
1job-scheduler:
2 job-scheduler-type: "message-queue"
Language: java
Name: job-scheduler
Description:
[Warning: empty required content area]All job requests require three parameters:
- fluent-account: Fluent account name
- fluent-retailer: Fluent retailer id
- job-name: The name of the handler that will execute the job
1POST https://localhost:8080/api/v1/fluent-connect/scheduler/add/cnctdev/1/batch-inventory-sync
Language: json
Name: Example
Description:
[Warning: empty required content area]The execution of a job is performed asynchronously, and the HTTP request returns 200 OK if the Connect SDK managed queue the execution correctly. All jobs in the Connect SDK share the same pre- and post-execution steps:
- A job cannot have multiple instances running simultaneously
- Retrieves the job configuration from Fluent settings.
- Calculate a date range (from-to datetime) the job may use during its execution.
- Execute the actual job.
- Persist the calculated date range as part of the job configuration for the next execution.
- Connect SDK runs at UTC time.
There are two kinds of configuration for a job:
- Runtime configuration - More frequently updated properties that control the job behavior.
- Built time configuration - Properties that limit or give a purpose to the job that end user should modify.
Runtime configuration
- Kept at Fluent as settings
- The configuration key of a job is determined dynamically by taking the configuration and appending the job name at the end. The SDK
`job-scheduler.config-prefix`
default template is:`job-scheduler.config-prefix`
`fc.connect.<my-sample-connector>.batch.<job-name>`
Every job gets a pre-calculated date range, and the SDK provides this to any job handler as part of the job context. This date range allows the job to work with data in delta mode (if required), only fetching/processing items that have changed since the last execution of the job.
Below is an example of a configuration with no previous execution and some properties in
`props`
`previousEndDate`
`prev`
`lastRun.param.end`
`lastRun.param.end`
`previousEndDate`
1{
2 "previousEndDate": "2021-09-22T03:00:00Z",
3 "props": {
4 "fluentNetworkRef": "BASE_67"
5 },
6 "lastRun": {
7 "param": {
8 "start": "1970-01-01T00:00:00Z",
9 "end": "2021-09-22T03:00:00Z",
10 "props": {
11 "fluentNetworkRef": "BASE_67"
12 }
13 },
14 "jobStart": "2021-09-22T03:00:00Z",
15 "jobEnd": "2021-09-22T03:05:00Z",
16 "status": "SUCCESSFUL"
17 }
18}
Language: json
Name: Code
Description:
[Warning: empty required content area]The next day the job runs again, the current value of
`previousEndDate`
`lastRun.param.start`
`lastRun.param.end`
`previousEndDate`
1{
2 "previousEndDate": "2021-09-22T02:00:00Z",
3 "props": {
4 "fluentNetworkRef": "BASE_67"
5 },
6 "lastRun": {
7 "param": {
8 "start": "2021-09-22T03:00:00Z",
9 "end": "2021-09-23T02:00:00Z",
10 "props": {
11 "fluentNetworkRef": "BASE_67"
12 }
13 },
14 "jobStart": "2021-09-23T02:00:00Z",
15 "jobEnd": "2021-09-23T02:05:00Z",
16 "status": "SUCCESSFUL"
17 }
18}
Language: json
Name: Code
Description:
[Warning: empty required content area]It is also possible for a user to override the normal job operation described above by providing a custom date range. These user-specified dates are only used once and will not be taken into account for subsequent executions.
1{
2 "customDateRange": {
3 "start": "2021-04-22T00:00:00Z",
4 "end": "2021-04-23T00:00:00Z"
5 },
6 "previousEndDate": "2021-09-22T02:00:00Z",
7 "props": {
8 "fluentNetworkRef": "BASE_67"
9 },
10 "lastRun": {
11 "param": {
12 "start": "2021-09-22T03:00:00Z",
13 "end": "2021-09-23T02:00:00Z",
14 "props": {
15 "fluentNetworkRef": "BASE_67"
16 }
17 },
18 "jobStart": "2021-09-23T02:00:00Z",
19 "jobEnd": "2021-09-23T02:05:00Z",
20 "status": "SUCCESSFUL"
21 }
22}
Language: json
Name: Code
Description:
[Warning: empty required content area]After the job runs with the custom dates, the custom range is removed and
`previousEndDate`
1{
2 "previousEndDate": "2021-09-22T02:00:00Z",
3 "props": {
4 "fluentNetworkRef": "BASE_67"
5 },
6 "lastRun": {
7 "param": {
8 "start": "2022-04-22T00:00:00Z",
9 "end": "2022-04-23T00:00:00Z",
10 "props": {
11 "fluentNetworkRef": "BASE_67"
12 }
13 },
14 "jobStart": "2021-09-24T02:00:00Z",
15 "jobEnd": "2021-09-24T02:05:00Z",
16 "status": "SUCCESSFUL"
17 }
18}
Language: json
Name: Code
Description:
[Warning: empty required content area]While the job is running, a temporary property called
`executionStart`
1{
2 "previousEndDate": "2021-09-22T02:00:00Z",
3 "executionStart" : "2021-09-24T02:00:00Z",
4 "props": {
5 "fluentNetworkRef": "BASE_67"
6 },
7 "lastRun": {
8 "param": {
9 "start": "2022-04-22T00:00:00Z",
10 "end": "2022-04-23T00:00:00Z",
11 "props": {
12 "fluentNetworkRef": "BASE_67"
13 }
14 },
15 "jobStart": "2021-09-24T02:00:00Z",
16 "jobEnd": "2021-09-24T02:05:00Z",
17 "status": "SUCCESSFUL"
18 }
19}
Language: json
Name: Code
Description:
[Warning: empty required content area]Property | Description |
customDateRange | Optional date range a user can set to override the jobs current
|
customDateRange.start | Custom user date range start datetime in UTC. |
customDateRange.end | Custom user date range end datetime in UTC. |
previousEndDate | Used to keep track of the last execution date. This drives how the SDK calculates the next date range execution. Whatever the value is set here, this is used as the next start date. If this is missing or invalid, epoch is as the start date. The end date is always the current time the job is executed. |
props | Optional job settings. |
lastRun | Keeps a copy of the last execution parameters, date range used as well as job execution times. |
lastRun.param.start | Last execution date range start in UTC. |
lastRun.param.end | Last execution date range end in UTC. |
lastRun.param.props | Last execution props. |
lastRun.jobStart | Last execution date time the job start running in UTC. |
lastRun.jobsEnd | Last execution date time the job finished running in UTC. |
lastRun.status | Last execution status of how the job finished: FAILED / SUCCESSFUL. |
Kafka Limitations
When using a Kafka topic to queue job requests from a scheduler, it's important to consider that Kafka works differently from SQS. When the SDK reads messages from the job topic using Kafka, it will take the available job requests and start processing them. If new job requests arrive while the first set is still being processed, then these will remain queued until the SDK completes the first set and is able to take the queued jobs. A queued job time to live is heavily coupled with how frequently the SDK pulls the topic and the lowest frequency allowed is 1 minute. It is not possible to guarantee that the first job requests will complete processing before the second set of job requests expires. When job requests expire in the topic, they get discarded and are moved to the DLQ. This means the schedule job is not executed and it will have to wait until the scheduler issues another request for its execution.
Build configuration
The first level of a job configuration is at the class where the job is written through the
`HandlerInfo`
1@HandlerInfo(
2 name = "InventorySyncJob",
3 route = "batch-inventory-sync",
4 description = "Sync inventory positions from Fluent to Commercetools",
5 props = {@HandlerProp(key = "page-size", value = "500")}
6)
Language: java
Name: Code
Description:
[Warning: empty required content area]To override an existing job configuration, it is possible to use the YAML configuration files. The first route below simply changes the properties by giving the job a larger page size. The second route uses the same job class identified by the handler name
`InventorySyncJob`
1 routes:
2 job:
3 - route: "batch-inventory-sync"
4 handler-name: "InventorySyncJob"
5 props:
6 page-size: 1000
7 - route: "custom-inventory-sync"
8 handler-name: "InventorySyncJob"
9 props:
10 page-size: 200
11 another-property: "some value"
Language: java
Name: Code
Description:
[Warning: empty required content area]Cache Configuration
ConnectSDK uses spring cache to store some data in memory and the table below lists the standard caches from SDK.
Cache Name | Description | Default Expiry in Seconds |
fluent-api-client | Fluent API Client cache | 300 |
account-mapping | Fluent account mapping cache. This is often used to map a Fluent account to external systems or vice versa | 300 |
account-mappings | Fluent account mapping cache. This is often used to map a Fluent account to external systems or vice versa | 300 |
account-configuration | Configuration (Fluent settings) cache | 300 |
fluent-auth-token-context | Handler context cache from a Fluent Auth token | 300 |
api-key-context | Handler context cache from a SDK API Key | 300 |
It is possible to both create new cache or update the settings of existing ones through the YAML configuration file.
1fluent-connect:
2 cache:
3 caffeine:
4 - name: "fluent-api-client"
5 expiryInSeconds: 600
6 - name: YourNewCache
7 expiryInSeconds: 600
Language: java
Name: Code
Description:
[Warning: empty required content area]In order to make use of the cache, just add a new entry for your cache and add the following annotation to your method.
1@Cacheable(cacheManager = cacheManager = CacheConfiguration.SDK_CACHE_MANAGER, cacheNames = "YourNewCache")
Language: java
Name: Code
Description:
[Warning: empty required content area]Product Availability
Feature Configuration
Property | Description | Default |
fulfilment-option.enabled | Enables the product availability endpoint. If set to false the endpoint is not available and the request will result in a 404 response. | true |
fulfilment-option.logging.enabled | Logs extra information on the request, response, and execution time of each enrichment step. Not recommended for production environments. | false |
API Key Configuration
API Keys are configured directly at the Secret Manager just like any other credential used by the SDK. API keys are specific for a Fluent account and therefore must be configured using the following secret key template:
`fc/connect/{connector-name}/{fluent-account-name}/api/fluent-account/api-keys`
`connector-name`
`fluent-account-name`
A token has the following properties:
- API Key size: must contain at least 16 characters.
- Duplicate keys: Must be unique. Duplicate tokens are deemed invalid by the SDK and disregarded.
- Expiry: May or may not have an expiry. Tokens without an expiry will always be valid. The expiry date-time must be in UTC format as shown below.
- Caching: keys are cached by the SDK. Modifying an API key configuration may not have an immediate effect.
- Multi-retailer: It's possible to have any number of retailers with any number of keys for each retailer.
For more information on how to configure your credentials in your development environment follow the Credential / Secret Setup Guide.
Example of a token configuration:
1{
2 "api-keys": [
3 {
4 "retailer": "1",
5 "username": "fluent-username-1",
6 "password": "ZMCZZZ",
7 "keys": [
8 {
9 "key": "1283712983721983",
10 "expiry": "2020-12-30T20:28:07.00Z"
11 },
12 {
13 "key": "3333332222222222"
14 }
15 ]
16 },
17 {
18 "retailer": "34",
19 "username": "fluent-username-34",
20 "password": "ZMCFFF",
21 "keys": [
22 {
23 "key": "2222333344445555"
24 }
25 ]
26 }
27 ]
28}
Language: json
Name: Code
Description:
[Warning: empty required content area]Use the command below to test a token, but remember to first update the URL and token. Successful configurations will return the details of the user configured for the specified token, otherwise a 401 unauthorized response.
1curl -X 'GET' \
2'http://localhost:8080/api/v1/fluent-connect/api-key' \
3-H 'Authorization: ApiKey 1283712983721983'
4-H 'fluent.account: CNCTDEV'
Language: java
Name: Code
Description:
[Warning: empty required content area]Spring actuators
The SDK is built with Spring Framework, and it bundles a couple of custom actuators on the top of the default ones. The list of enabled actuators in the SDK can be found at bootstrap.yml file. To view the list of actuators active, see http://localhost:8080/actuator.
To learn more about actuators, please see Spring Boot Actuator Web API Documentation.
This is the default SDK settings for actuators.
1management:
2 endpoints:
3 web:
4 exposure:
5 include: health, info, caches, scheduledtasks, sdkhandlers, sdkroutes
Language: java
Name: Code
Description:
[Warning: empty required content area]