Skip to content

Instantly share code, notes, and snippets.

@atomatt
Last active January 1, 2024 11:23
Show Gist options
  • Select an option

  • Save atomatt/576b7b9359d1fa10d5665b250a8e23a7 to your computer and use it in GitHub Desktop.

Select an option

Save atomatt/576b7b9359d1fa10d5665b250a8e23a7 to your computer and use it in GitHub Desktop.
Benthos example - 3 sftp servers with duplicated content, at least once delivery for each file found
input:
broker:
inputs:
- sftp:
address: sftp1:22
credentials:
username: u
password: p
paths:
- /inbox/*.txt
watcher:
enabled: true
poll_interval: 3s
cache: sftp1
delete_on_finish: true
- sftp:
address: sftp2:22
credentials:
username: u
password: p
paths:
- /inbox/*.txt
watcher:
enabled: true
poll_interval: 4s
cache: sftp2
delete_on_finish: true
- sftp:
address: sftp3:22
credentials:
username: u
password: p
paths:
- /inbox/*.txt
watcher:
enabled: true
poll_interval: 5s
cache: sftp3
delete_on_finish: true
output:
processors:
- branch:
processors:
- cache:
resource: seen
operator: get
key: ${! meta("sftp_path" )}
- catch:
- mapping: root = ""
result_map: |
meta seen = content()
switch:
cases:
- check: meta("seen") == "t"
output:
drop: {}
- output:
broker:
pattern: fan_out_sequential
outputs:
- stdout: {}
- drop: {}
processors:
- cache:
resource: seen
operator: set
key: ${! meta("sftp_path" )}
value: "t"
cache_resources:
- label: seen
memory: {}
- label: sftp1
memory: {}
- label: sftp2
memory: {}
- label: sftp3
memory: {}
services:
sftp1:
image: emberstack/sftp
volumes:
- ./sftp.json:/app/config/sftp.json
- ./sftp/1:/home/u/inbox
sftp2:
image: emberstack/sftp
volumes:
- ./sftp.json:/app/config/sftp.json
- ./sftp/2:/home/u/inbox
sftp3:
image: emberstack/sftp
volumes:
- ./sftp.json:/app/config/sftp.json
- ./sftp/3:/home/u/inbox
benthos:
image: jeffail/benthos
volumes:
- ./benthos.yaml:/benthos.yaml
command:
- -w
- --set
- logger.level=warn
{
"Global": {
"Chroot": {
"Directory": "%h"
}
},
"Users": [
{
"Username": "u",
"Password": "p"
}
]
}
pipeline:
processors:
- branch:
processors:
- cache:
resource: seen
operator: get
key: ${! meta("sftp_path" )}
- catch:
- mapping: root = ""
result_map: |
meta seen = content()
- mapping: |
root = if meta("seen") == "t" { deleted() }
output:
broker:
pattern: fan_out_sequential
outputs:
- stdout: {}
- drop: {}
processors:
- cache:
resource: seen
operator: set
key: ${! meta("sftp_path" )}
value: "t"
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment