Created
March 1, 2020 05:53
-
-
Save md-jamal/bbad92f48e5842038246af3bf623e7ca to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| /* | |
| * Copyright (c) 2019 Intel Corporation | |
| * | |
| * SPDX-License-Identifier: Apache-2.0 | |
| */ | |
| #include <logging/log.h> | |
| LOG_MODULE_REGISTER(aws_mqtt, LOG_LEVEL_DBG); | |
| #include <zephyr.h> | |
| #include <net/socket.h> | |
| #include <net/mqtt.h> | |
| #include <net/net_config.h> | |
| #include <net/net_event.h> | |
| #include <net/sntp.h> | |
| #include <sys/printk.h> | |
| #include <string.h> | |
| #include <errno.h> | |
| #include "dhcp.h" | |
| #include <time.h> | |
| #include <inttypes.h> | |
| #include "test_certs.h" | |
| #include "globalsign.inc" | |
| #if defined(CONFIG_SOCKS) | |
| static struct sockaddr socks5_proxy; | |
| #endif | |
| static struct pollfd fds[1]; | |
| static int nfds; | |
| #define APP_MQTT_BUFFER_SIZE 128 | |
| #define APP_CA_CERT_TAG 1 | |
| #define APP_PSK_TAG 2 | |
| static sec_tag_t m_sec_tags[] = { | |
| #if defined(MBEDTLS_X509_CRT_PARSE_C) || defined(CONFIG_NET_SOCKETS_OFFLOAD) | |
| APP_CA_CERT_TAG, | |
| #endif | |
| #if defined(MBEDTLS_KEY_EXCHANGE__SOME__PSK_ENABLED) | |
| APP_PSK_TAG, | |
| #endif | |
| }; | |
| /* Buffers for MQTT client. */ | |
| static u8_t rx_buffer[APP_MQTT_BUFFER_SIZE]; | |
| static u8_t tx_buffer[APP_MQTT_BUFFER_SIZE]; | |
| static bool connected; | |
| #define MQTT_CLIENTID "zephyr_publisher" | |
| struct zsock_addrinfo *haddr; | |
| s64_t time_base; | |
| static struct sockaddr_storage broker; | |
| static struct mqtt_client client_ctx; | |
| static void clear_fds(void) | |
| { | |
| nfds = 0; | |
| } | |
| time_t my_k_time(time_t *ptr) | |
| { | |
| s64_t stamp; | |
| time_t now; | |
| stamp = k_uptime_get(); | |
| now = (time_t)((stamp + time_base) / 1000); | |
| if (ptr) { | |
| *ptr = now; | |
| } | |
| return now; | |
| } | |
| static void broker_init(void) | |
| { | |
| struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker; | |
| broker4->sin_family = AF_INET; | |
| broker4->sin_port = htons(8883); | |
| net_ipaddr_copy(&broker4->sin_addr, | |
| &net_sin(haddr->ai_addr)->sin_addr); | |
| #if defined(CONFIG_SOCKS) | |
| struct sockaddr_in *proxy4 = (struct sockaddr_in *)&socks5_proxy; | |
| proxy4->sin_family = AF_INET; | |
| proxy4->sin_port = htons(1081); | |
| inet_pton(AF_INET, "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", &proxy4->sin_addr); | |
| #endif | |
| } | |
| void mqtt_evt_handler(struct mqtt_client *const client, | |
| const struct mqtt_evt *evt) | |
| { | |
| int err; | |
| switch (evt->type) { | |
| case MQTT_EVT_CONNACK: | |
| if (evt->result != 0) { | |
| LOG_ERR("MQTT connect failed %d", evt->result); | |
| break; | |
| } | |
| connected = true; | |
| LOG_INF("MQTT client connected!"); | |
| break; | |
| case MQTT_EVT_DISCONNECT: | |
| LOG_INF("MQTT client disconnected %d", evt->result); | |
| connected = false; | |
| clear_fds(); | |
| break; | |
| case MQTT_EVT_PUBACK: | |
| if (evt->result != 0) { | |
| LOG_ERR("MQTT PUBACK error %d", evt->result); | |
| break; | |
| } | |
| LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id); | |
| break; | |
| case MQTT_EVT_PUBREC: | |
| if (evt->result != 0) { | |
| LOG_ERR("MQTT PUBREC error %d", evt->result); | |
| break; | |
| } | |
| LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id); | |
| const struct mqtt_pubrel_param rel_param = { | |
| .message_id = evt->param.pubrec.message_id | |
| }; | |
| err = mqtt_publish_qos2_release(client, &rel_param); | |
| if (err != 0) { | |
| LOG_ERR("Failed to send MQTT PUBREL: %d", err); | |
| } | |
| break; | |
| case MQTT_EVT_PUBCOMP: | |
| if (evt->result != 0) { | |
| LOG_ERR("MQTT PUBCOMP error %d", evt->result); | |
| break; | |
| } | |
| LOG_INF("PUBCOMP packet id: %u", | |
| evt->param.pubcomp.message_id); | |
| break; | |
| default: | |
| break; | |
| } | |
| } | |
| static void client_init(struct mqtt_client *client) | |
| { | |
| static struct mqtt_utf8 password; | |
| static struct mqtt_utf8 username; | |
| mqtt_client_init(client); | |
| broker_init(); | |
| password.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_PASSWORD; | |
| password.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_PASSWORD); | |
| client->password = &password; | |
| username.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_USERNAME; | |
| username.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_USERNAME); | |
| client->user_name = &username; | |
| /* MQTT client configuration */ | |
| client->broker = &broker; | |
| client->evt_cb = mqtt_evt_handler; | |
| client->client_id.utf8 = (u8_t *)MQTT_CLIENTID; | |
| client->client_id.size = strlen(MQTT_CLIENTID); | |
| client->password = NULL; | |
| client->user_name = NULL; | |
| client->protocol_version = MQTT_VERSION_3_1_1; | |
| /* MQTT buffers configuration */ | |
| client->rx_buf = rx_buffer; | |
| client->rx_buf_size = sizeof(rx_buffer); | |
| client->tx_buf = tx_buffer; | |
| client->tx_buf_size = sizeof(tx_buffer); | |
| client->transport.type = MQTT_TRANSPORT_SECURE; | |
| struct mqtt_sec_config *tls_config = &client->transport.tls.config; | |
| tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED; | |
| tls_config->cipher_list = NULL; | |
| tls_config->sec_tag_list = m_sec_tags; | |
| tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags); | |
| tls_config->hostname = "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com"; | |
| client->transport.type = MQTT_TRANSPORT_SECURE; | |
| #if defined(CONFIG_SOCKS) | |
| mqtt_client_set_proxy(client, &socks5_proxy, | |
| socks5_proxy.sa_family == AF_INET ? | |
| sizeof(struct sockaddr_in) : | |
| sizeof(struct sockaddr_in6)); | |
| #endif | |
| } | |
| #define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR") | |
| #define PRINT_RESULT(func, rc) \ | |
| LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc)) | |
| static void prepare_fds(struct mqtt_client *client) | |
| { | |
| if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) { | |
| fds[0].fd = client->transport.tcp.sock; | |
| } | |
| #if defined(CONFIG_MQTT_LIB_TLS) | |
| else if (client->transport.type == MQTT_TRANSPORT_SECURE) { | |
| fds[0].fd = client->transport.tls.sock; | |
| } | |
| #endif | |
| fds[0].events = ZSOCK_POLLIN; | |
| nfds = 1; | |
| } | |
| static void wait(int timeout) | |
| { | |
| if (nfds > 0) { | |
| if (poll(fds, nfds, timeout) < 0) { | |
| LOG_ERR("poll error: %d", errno); | |
| } | |
| } | |
| } | |
| static int try_to_connect(struct mqtt_client *client) | |
| { | |
| u8_t retries = 3U; | |
| int rc; | |
| LOG_DBG("attempting to connect..."); | |
| client_init(client); | |
| rc = mqtt_connect(client); | |
| if (rc != 0) { | |
| PRINT_RESULT("mqtt_connect", rc); | |
| k_sleep(500); | |
| } else { | |
| prepare_fds(client); | |
| wait(500); | |
| mqtt_input(client); | |
| if (!connected) { | |
| LOG_INF("Aborting connection\n"); | |
| mqtt_abort(client); | |
| } | |
| } | |
| if (connected) { | |
| return 0; | |
| } | |
| return -EINVAL; | |
| } | |
| static int tls_init(void) | |
| { | |
| int err = -EINVAL; | |
| LOG_INF("Ca size:%d\n", sizeof(ca_certificate)); | |
| err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE, | |
| globalsign_certificate, sizeof(globalsign_certificate)); | |
| if (err < 0) { | |
| LOG_ERR("Failed to register public certificate: %d", err); | |
| return err; | |
| } | |
| return err; | |
| } | |
| void mqtt_startup(char *hostname, int port) | |
| { | |
| struct mqtt_client *client = &client_ctx; | |
| int retries = 5; | |
| int err, cnt; | |
| static struct zsock_addrinfo hints; | |
| int res = 0; | |
| int rc; | |
| hints.ai_family = AF_INET; | |
| hints.ai_socktype = SOCK_STREAM; | |
| hints.ai_protocol = 0; | |
| cnt = 0; | |
| while ((err = getaddrinfo("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", "8883", &hints, | |
| &haddr)) && cnt < 3) { | |
| LOG_ERR("Unable to get address for broker, retrying"); | |
| cnt++; | |
| } | |
| if (err != 0) { | |
| LOG_ERR("Unable to get address for broker, error %d", | |
| res); | |
| return; | |
| } | |
| LOG_INF("DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8833"); | |
| try_to_connect(client); | |
| PRINT_RESULT("try_to_connect", rc); | |
| } | |
| static void show_addrinfo(struct addrinfo *addr) | |
| { | |
| char hr_addr[NET_IPV6_ADDR_LEN]; | |
| void *a; | |
| top: | |
| LOG_DBG(" flags : %d", addr->ai_flags); | |
| LOG_DBG(" family : %d", addr->ai_family); | |
| LOG_DBG(" socktype: %d", addr->ai_socktype); | |
| LOG_DBG(" protocol: %d", addr->ai_protocol); | |
| LOG_DBG(" addrlen : %d", (int)addr->ai_addrlen); | |
| /* Assume two words. */ | |
| LOG_DBG(" addr[0]: 0x%lx", ((uint32_t *)addr->ai_addr)[0]); | |
| LOG_DBG(" addr[1]: 0x%lx", ((uint32_t *)addr->ai_addr)[1]); | |
| if (addr->ai_next != 0) { | |
| addr = addr->ai_next; | |
| goto top; | |
| } | |
| a = &net_sin(addr->ai_addr)->sin_addr; | |
| LOG_INF(" Got %s", | |
| log_strdup(net_addr_ntop(addr->ai_family, a, | |
| hr_addr, sizeof(hr_addr)))); | |
| } | |
| void do_sntp(struct addrinfo *addr) | |
| { | |
| struct sntp_ctx ctx; | |
| int rc; | |
| s64_t stamp; | |
| struct sntp_time sntp_time; | |
| char time_str[sizeof("1970-01-01T00:00:00")]; | |
| LOG_INF("Sending NTP request for current time:"); | |
| /* Initialize sntp */ | |
| rc = sntp_init(&ctx, addr->ai_addr, sizeof(struct sockaddr_in)); | |
| if (rc < 0) { | |
| LOG_ERR("Unable to init sntp context: %d", rc); | |
| return; | |
| } | |
| rc = sntp_query(&ctx, K_FOREVER, &sntp_time); | |
| if (rc == 0) { | |
| stamp = k_uptime_get(); | |
| time_base = sntp_time.seconds * MSEC_PER_SEC - stamp; | |
| /* Convert time to make sure. */ | |
| time_t now = sntp_time.seconds; | |
| struct tm now_tm; | |
| gmtime_r(&now, &now_tm); | |
| strftime(time_str, sizeof(time_str), "%FT%T", &now_tm); | |
| LOG_INF(" Acquired time: %s", log_strdup(time_str)); | |
| } else { | |
| LOG_ERR(" Failed to acquire SNTP, code %d\n", rc); | |
| } | |
| sntp_close(&ctx); | |
| } | |
| void main(void) | |
| { | |
| static struct addrinfo hints; | |
| struct addrinfo *haddr; | |
| int res; | |
| int cnt = 0; | |
| int rc; | |
| app_dhcpv4_startup(); | |
| LOG_INF("Should have DHCPv4 lease at this point."); | |
| hints.ai_family = AF_INET; | |
| hints.ai_socktype = SOCK_DGRAM; | |
| hints.ai_protocol = 0; | |
| //while ((res = getaddrinfo("console.aws.amazon.com", "80", &hints, | |
| while ((res = getaddrinfo("time.google.com", "123", &hints, | |
| &haddr)) && cnt < 3) { | |
| LOG_ERR("Unable to get address for NTP server, retrying"); | |
| cnt++; | |
| } | |
| if (res != 0) { | |
| LOG_ERR("Unable to get address of NTP server, exiting %d", res); | |
| return; | |
| } | |
| LOG_INF("DNS resolved for console.aws.amazon.com:80"); | |
| show_addrinfo(haddr); | |
| do_sntp(haddr); | |
| #if defined(CONFIG_MQTT_LIB_TLS) | |
| rc = tls_init(); | |
| PRINT_RESULT("tls_init", rc); | |
| #endif | |
| mqtt_startup("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", 8883); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment