Installation of Micronaut on Mac(OSX) & Linux.

Micronaut is a full framework to develop cloud native microservice architecture based application using java, kotlin or Groovy.

Let us check the steps required to install micronaut on OSx.

Simple and effortless start on Mac OSX, Linux, you can use SDKMAN! (The Software Development Kit Manager) to download and configure any Micronaut version of your choice.

INSTALLING WITH SDKMAN:

This tool makes installing of a Micronaut on any Unix based platform such as Linux, OSx.

Open a terminal and install SDKMAN,

$  curl -s https://get.sdkman.io | bash

Follow the on-screen instructions to complete installation.

Then fire below command after installation of SDKMAN to configure SDKMAN.

 $ source "$HOME/.sdkman/bin/sdkman-init.sh"

Once above two steps are in align then go and setup micrnaut using SDKMAN,

$ sdk install micronaut

After installation is complete it can be validated with below command.

$ mn --version
installation and validation of micronaut

this is the simple steps with SDKMAN.

INSTALLING WITH HomeBrew:

Before installation using homebrew you should update homebrew version

$ brew update

In order to install Micronaut, run following command:

$ brew install micronaut

After installation is complete it can be validated with below command.

$ mn --version

Installing with MacPorts:

Before installing it is recommended to sync the latest Portfiles, So that there shouldn’t be any issue,

$ sudo port sync

To install Micronaut, run following command

$ sudo port install micronaut

After installation is complete it can be validated with below command.

$ mn --version

Above are they three different way we can setup micronaut framework on MacOS and linux based OS.

Spring Tools 4 for Visual Studio Code.

Visual studio code is the most popular, open-source and lightweight editor in the market. Spring boot has also another popular and powerful Framework in the Java ecosystem. It gains popularity because of simplicity and bootstrap the development. Spring boot is very handy to develop microservice-based applications and also support for cloud-native development. 

This article is for those who want to leverage their VS code editor to develop spring framework application. Spring comes up with tools to support the development of spring framework based application in VSCode.

Spring really provides the flexibility to a developer, for spring developer you don’t need any special editor, IDE or OS or any tool suite as well. 

Spring is nature’s way of saying, ‘Let’s Party!

As per the above quotes spring framework really saying developer don’t worry let’s do a party I will take care of everything. 

If you have hit this article that means you are familiar with Visual Studio code editor. You can download visual studio code if you haven’t by using the below link:

https://code.visualstudio.com/

Configure the Spring boot with Visual Studio Code

After installation of vs code editor on your local system. Now we have considered below points to configure Spring boot with VS code.

I will assume you have done below configurations

  1. VS code installation.
  2. Java extension for VS Code.
  3. Kotlin extension if you want to develop a spring boot app using kotlin.

If above everything is done then go and open VS Code and go to extension and search for “spring”. You will able to something like the below image in your vscode editor.

Click and install “Spring Boot Extension Pack” Once the installation is done reload the VS code.

Spring Boot Extension Pack is acollection of extensions for developing and deploying Spring Boot Application.

  1. Spring boot tools.
  2. Spring Initializr Java Support.
  3. Spring Boot Dashboard.

Spring Boot Tools:

VSCode extension and Language Server providing support for working with Spring Boot application.properties, application.yml and .java files.

Spring Initializr Java Support:

Spring Initializr is a lightweight extension to quickly generate a Spring Boot project in Visual Studio Code (VS Code). It helps you to customize your projects with configurations and manage Spring Boot dependencies.

Spring Boot Dashboard:

Spring Boot Dashboard is a lightweight extension in Visual Studio Code (VS Code). With an explorer in the side bar, you can view and manage all available Spring Boot projects in your workspace. It also supports the features to quickly start, stop or debug a Spring Boot project.

Feature List

  • View Spring Boot apps in workspace
  • Start / Stop a Spring Boot app
  • Debug a Spring Boot app
  • Open a Spring Boot app in the browser
  • Generate a Maven/Gradle Spring Boot project
  • Customize configurations for a new project (language, group id, artifact id, boot version, and dependencies)
  • Search for dependencies
  • Quickstart with last settings
  • Edit Spring Boot dependencies of an existing Spring Boot project

Extention pack contains:

  1. IDE Java tooling for developing and troubleshooting Spring Boot applications.
  2. It provides support for editing Cloud Foundry deployment manifest .yml files for Spring Boot application deployment.
  3. The Concourse CI Pipeline Editor provides support for setting up Concourse build pipeline for the Spring Boot application.
  4. It provides support for generating quickstart Spring Boot Java projects with Spring Initiailizr API.
  5. It provides an explorer in the sidebar where you can view all of a workspace’s spring boot projects conveniently in one place.

This is it for now up to installation.

Want to create an application using vscode, check the below video. 

Spring Boot in VS Code

Use of spring initializer 

  • Launch VS Code
  • Press Ctrl + Shift + P to open the command palette.
  • Type Spring Initializr to start generating a Maven or Gradle project.
  • Follow the wizard.
  • Right-click inside the pom.xml file and choose Edit starters for dependency refactoring. (Gradle project is not supported yet, PR is welcome for it.)

Shortcuts may change based on the Operating System.

More such Stories

Configuration as a Service: Spring Cloud Config – using kotlin.

Developing a microservice architecture with Java and Spring Boot is quite popular these days. In microservice architecture we hundreds of services and managing services for each service and for each profile which is a quite tedious task. In this article, we will demonstrate the Spring cloud config server using kotlin. 

Spring Boot provided a much-needed spark to the Spring projects.

Spring cloud-config provides a server and client-side support for externalizedconfiguration in a distributed system. With the Config Server, you have a central place to manage external properties for applications across all environments.

From the above diagram, you can easily predict that in distributed systems managing configuration as a central service is a bit tedious task and spring cloud config provide client, server architecture mechanism to manage the configuration easily. 

Let us go to the https://start.spring.io/

When we do any changes in any service we have to restart the services to apply the changes.

Let us create one git repo to manage our configuration and to achieve this we are creating one git repo.

So we will create “springbootclient” as one small spring boot microservice to take the username and that will read username from spring cloud config central configuration server system i.e git here.

We have created three different properties files for each of our different environments. 

  1. springbootclient.properties
  2. springbootclient-dev.properties
  3. springbootclient-prod.properties

https://github.com/maheshwarLigade/cloud-common-config-server

Here is our spring cloud config properties are available, you can clone or use directly this repository too.

Now as we have created spring config server application using spring starter let us download and import that project in your favorite IDE or editor. git repo here we used to store our configuration and spring cloud config server application is to serve those properties to the client.

Basically git is datastore, spring cloud config server is server application and there are multiple microservices are the clients which needs configurations.

Now our git as datastore is ready. In this repository, we have created one sample client application and the name of that app is springbootclient. In the future microservice article we will utilize the same spring cloud config as a configuration server.

Let us go and check the code base for the client app.

This is the sample application.properties file:

server.port=8888
logging.level.org.springframework.cloud.config=DEBUG
spring.cloud.config.server.git.uri=https://github.com/maheshwarLigade/cloud-common-config-server.git
spring.cloud.config.server.git.clone-on-start=true
spring.cloud.config.server.git.searchPaths=springbootclient

Sample Code for SpringCloudConfigServerexApplication.kt

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication
import org.springframework.cloud.config.server.EnableConfigServer

@SpringBootApplication
@EnableConfigServer
class SpringCloudConfigServerexApplication

fun main(args: Array<String>) {
   runApplication<SpringCloudConfigServerexApplication>(*args)
}

Now run and up the spring cloud-config server and check the below URL:

http://localhost:8888/springbootclient/dev/master

Spring Boot Client App:

Let us create one small microservice which will read configuration from spring cloud config server and serve that property value over REST end point.

Go to the https://start.spring.io/ and create spring boot client microservice using kotlin.

Sample POM.xml dependencies.

<dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
   <groupId>com.fasterxml.jackson.module</groupId>
   <artifactId>jackson-module-kotlin</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-reflect</artifactId>
</dependency>
<dependency>
   <groupId>org.jetbrains.kotlin</groupId>
   <artifactId>kotlin-stdlib-jdk8</artifactId>
</dependency>
<dependency>
   <groupId>org.springframework.cloud</groupId>
   <artifactId>spring-cloud-starter-config</artifactId>
</dependency>

Now check the SpringCloudClientAppApplication.kt code

import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.runApplication

@SpringBootApplication
class SpringCloudClientAppApplication

fun main(args: Array<String>) {
    runApplication<SpringCloudClientAppApplication>(*args)
}

Now create one sample REST controller which is serving REST request. We want to check ” /whoami” this endpoint is returning which is the user based on active profile dev, prod, etc.

UserController.kt

import org.springframework.beans.factory.annotation.Value
import org.springframework.web.bind.annotation.GetMapping
import org.springframework.web.bind.annotation.RestController


@RestController
class UserController {

    @Value("\${app.adminusername}")
    var username="Test"
//get request serving
    @GetMapping("/whoami")
    fun whoami() = "I am a  "+ username

}

Create a bootstrap.properties file where we will specify the spring cloud config server details, which is a git branch and what is active profile dev, local, prod, etc.

spring.application.name=springbootclient
spring.profiles.active=dev
spring.cloud.config.uri=http://localhost:8888
spring.cloud.config.fail-fast=true
spring.cloud.config.label=master

All properties are self exclamatory, what is the use of which one.

Once you hit this URL http://localhost:9080/whoami

Output:- I am a DevUser

Github source link:

Config Server: https://github.com/maheshwarLigade/cloud-common-config-server

Codebase: https://github.com/maheshwarLigade/spring-cloud-config-kotlin-ex

More such Stories

Enable Spring security using kotlin!!

Spring security is the defacto abstraction in the spring framework world to add authentication and authorization layer for your application. There are plenty of examples. In this article, we have spring security using kotlin.

In the last article, we have seen that and developed Spring Boot, MongoDB REST API using kotlin. We will use the same example to add spring security dependency.

https://github.com/maheshwarLigade/springboot-mongodb.restapi/tree/master

To illustrate spring-security example add spring security starter dependency.

implementation("org.springframework.boot:spring-boot-starter-security")

If you are in favor of the Maven build tool then use below dependency.

<dependency> 
<groupId>org.springframework.boot</groupId> 
<artifactId>spring-boot-starter-security</artifactId> 
</dependency>

As our existing example of REST API using kotlin. it contains patient data and CRUD operation so we will have two different roles, ADMIN and Doctor. 

For simplification, we will have two roles only and we will only one endpoint “\patients” with five different operations. We will secure all our end point either using @Secured annotation or config class.

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.http.HttpMethod
import org.springframework.security.config.annotation.authentication.builders.AuthenticationManagerBuilder
import org.springframework.security.config.annotation.web.builders.HttpSecurity
import org.springframework.security.config.annotation.web.configuration.WebSecurityConfigurerAdapter
import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder
import org.springframework.security.crypto.password.PasswordEncoder

@Configuration
class SecurityConfig : WebSecurityConfigurerAdapter() {

    @Bean
    fun encoder(): PasswordEncoder {
        return BCryptPasswordEncoder()
    }

    override fun configure(auth: AuthenticationManagerBuilder) {
        auth.inMemoryAuthentication()
                .withUser("admin")
                .password(encoder().encode("pass"))
                .roles("DOCTOR", "ADMIN")
                .and()
                .withUser("doctor")
                .password(encoder().encode("pass"))
                .roles("DOCTOR")
    }

    @Throws(Exception::class)
    override fun configure(http: HttpSecurity) {
        http.httpBasic()
                .and()
                .authorizeRequests()
                .antMatchers(HttpMethod.GET, "/patients").hasRole("ADMIN")
                .antMatchers(HttpMethod.POST, "/patients/**").hasRole("ADMIN")
                .antMatchers(HttpMethod.PUT, "/patients/**").hasRole("ADMIN")
                .antMatchers(HttpMethod.DELETE, "/patients/**").hasRole("ADMIN")
                .antMatchers(HttpMethod.GET, "/patients/**").hasAnyRole("ADMIN", "DOCTOR")
                .and()
                .csrf().disable()
                .formLogin().disable()
    }

}

As we have five operations and two roles ADMIN and DOCTOR.

If you don’t want to encrypt the password you can use a plain text password with {noop} prefix.

First step where we have declared two users and then which endpoints do we want to secure and https method those we have configured. 

In this example we have two get endpoints first one to get all records of patients and another one is a specific patient record. This is a very good use case because 

  1. Get ALL patient only admin can access
  2. Get a particular patient that endpoint is accessible DOCTOR who treats him/her and ADMIN as well who has access to everything. 

csrf().disable is about disabling Spring Security built-in cross-site scripting protection.

formLogin().disable() is about disabling default login form.

Now everything is ready we have secured our REST endpoint. Let us hit all the REST endpoints in a previous way, you will get 401/403 HTTP status code. Try using the CURL command. 

$ curl localhost:8090/patients 
{   
"timestamp": "2020-04-12T05:37:16.718+0000",
"status": 401,   
"error": "Unauthorized",   
"message": "Unauthorized",   
"path": "/patients"
}

$ curl localhost:8090/patients -u admin:pass

Use the above endpoint and you will get successful results.

This is it for now about this. I know this is a very simple use case and example too but you can use the same structure and scale that to next level using a database with different user roles and access control lists.

GitHubCode Repo:-

https://github.com/maheshwarLigade/springboot-mongodb.restapi/tree/master

Spring Boot, MongoDB REST API using Kotlin.

As part of this article our focus to develop simple REST API using spring boot and MongoDB. 

Getting started with this is the Spring Initialiser tool: https://start.spring.io/

In this example, I am considering gradle as build tool and MongoDB as Database.

Download and import Project into your favorite editor, I prefer intellij,

Either you can install MongoDB on your local or you can use MongoDB hosted solution https://mlab.com/.

I am using mlab.com for this example.

Let us provide the MongoDB connection details in the application.properties

spring.data.mongodb.host=localhost #for now I kept localhost
spring.data.mongodb.port=27017
spring.data.mongodb.database=mongo-rest-api-kotlin-demo

Let us create entity class as Patient.

@Document
data class Patient (
        @Id
        val id: ObjectId = ObjectId.get(),
        val name: String,
        val description: String,
        val createdDate: LocalDateTime = LocalDateTime.now(),
        val modifiedDate: LocalDateTime = LocalDateTime.now()
)

@Document annotation rather than @Entity is used here for marking a class which objects we’d like to persist to the mongodb. 

@Id: is used for marking a field used for identification purposes. 

Also, we have provided some default values for the created date and modified date.

Let us create a repository interface.

import org.bson.types.ObjectId
import org.springframework.data.mongodb.repository.MongoRepository

interface PatientRepository : MongoRepository<Patient, String> {
    fun findOneById(id: ObjectId): Patient
    override fun deleteAll()

}

The repository interface is ready to use, we don’t have to write an implementation for it. This feature is provided by SpringData JPA. Also, MongoRepository interface provides all basic methods for CRUD operations. For now, we will consider only finOneById.

Now our backend is ready, let us write down the REST Controller which will serve our request efficiently.

@RestController
@RequestMapping("/patients")
class PatientController(
        private val patientsRepository: PatientRepository
) {

    @GetMapping
    fun getAllPatients(): ResponseEntity<List<Patient>> {
        val patients = patientsRepository.findAll()
        return ResponseEntity.ok(patients)
    }

    @GetMapping("/{id}")
    fun getOnePatient(@PathVariable("id") id: String): ResponseEntity<Patient> {
        val patient = patientsRepository.findOneById(ObjectId(id))
        return ResponseEntity.ok(patient)
    }
}

Now our basic Controller is ready, let us write some test cases.

@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT)
@ExtendWith(SpringExtension::class)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class PatientControllerIntTest @Autowired constructor(
        private val patientRepository: PatientRepository,
        private val restTemplate: TestRestTemplate
) {
    private val defaultPatientId = ObjectId.get()

    @LocalServerPort
    protected var port: Int = 0

    @BeforeEach
    fun setUp() {
        patientRepository.deleteAll()
    }


    private fun getRootUrl(): String? = "http://localhost:$port/patients"

    private fun saveOnePatient() = patientRepository.save(Patient(defaultPatientId, "Name", "Description"))

    @Test
    fun `should return all patients`() {
        saveOnePatient()

        val response = restTemplate.getForEntity(
                getRootUrl(),
                List::class.java
        )

        assertEquals(200, response.statusCode.value())
        assertNotNull(response.body)
        assertEquals(1, response.body?.size)
    }

    @Test
    fun `should return single patient by id`() {
        saveOnePatient()

        val response = restTemplate.getForEntity(
                getRootUrl() + "/$defaultPatientId",
                Patient::class.java
        )

        assertEquals(200, response.statusCode.value())
        assertNotNull(response.body)
        assertEquals(defaultPatientId, response.body?.id)
    }
}

Here we are using spring boot test to do integration testing also SpringBootTest.WebEnvironment.RANDOM_PORT is used here.

Note:

https://kotlinlang.org/docs/reference/coding-conventions.html#naming-rules

Please consider the naming convention while writing test cases for kotlin.

In JVM world similar conventions are well-known in Groovy and Scalaworld.

Always start with simple steps first, we will write down get operation first, try to fetch all Patient details.

Run the application and hit the http://localhost:8090/patients endpoint.

Let us create a POST request.

Create one simple request Object that will help us to create entity in mango world.

class PatientRequest(
        val name: String,
        val description: String
)

Here we will Pass patient names and descriptions about treatment. 

Now go to the REST Controller and handle a POST request.

@PostMapping
fun createPatient(@RequestBody request: PatientRequest): ResponseEntity<Patient> {
    val patient = patientsRepository.save(Patient(
            name = request.name,
            description = request.description
    ))
    return ResponseEntity(patient, HttpStatus.CREATED)
}

Let us create a method to create a PUT method to handle amendments in a document.

@PutMapping("/{id}")
fun updatePatient(@RequestBody request: PatientRequest, @PathVariable("id") id: String): ResponseEntity<Patient> {
    val patient = patientsRepository.findOneById(ObjectId(id))
    val updatedPatient = patientsRepository.save(Patient(
            id = patient.id,
            name = request.name,
            description = request.description,
            createdDate = patient.createdDate,
            modifiedDate = LocalDateTime.now()
    ))
    return ResponseEntity.ok(updatedPatient)
}

Test Method for an Update operation.

@Test
fun `should update existing patient`() {
    saveOnePatient()
    val patientRequest = preparePatientRequest()

    val updateResponse = restTemplate.exchange(
            getRootUrl() + "/$defaultPatientId",
            HttpMethod.PUT,
            HttpEntity(patientRequest, HttpHeaders()),
            Patient::class.java
    )
    val patientRequest = patientRepository.findOneById(defaultPatientId)

    assertEquals(200, updateResponse.statusCode.value())
    assertEquals(defaultPatientId, patientRequest.id)
    assertEquals(patientRequest.description, patientRequest.description)
    assertEquals(patientRequest.name, patientRequest.name)
}

Now our update operation is ready.

Let us Delete records using Delete operation.

As the deleted document won’t be included in the response, the 204 code will be returned.

@DeleteMapping("/{id}")
fun deletePatient(@PathVariable("id") id: String): ResponseEntity<Unit> {
    patientsRepository.deleteById(id)
    return ResponseEntity.noContent().build()
}

Test method which is straight forward to test delete method.

@Test
fun `should delete existing patient`() {
    saveOnePatient()

    val delete = restTemplate.exchange(
            getRootUrl() + "/$defaultPatientId",
            HttpMethod.DELETE,
            HttpEntity(null, HttpHeaders()),
            ResponseEntity::class.java
    )

    assertEquals(204, delete.statusCode.value())
    assertThrows(EmptyResultDataAccessException::class.java) { patientRepository.findOneById(defaultPatientId) }
}

Now our all CRUD operations are ready, run the application

This is for now Code is available on Github

https://github.com/maheshwarLigade/springboot-mongodb.restapi/tree/master

Kafka Connect QuickStart.

understand the kaka connect.

Apache Kafka is a distributed streaming platform. If you are not aware of apache kafka then you can use below articles

Kafka Installation guide

Kafka Idempotent Consumer

Kafka Idempotent Producer

Kafka Connect, an open-source component of Apache Kafka. kafka connect is another project in Apache kafka family. As we know some common use cases of apache kafka such as consume data from some RDBMS and sink data into Hadoop. This framework is developed based on convention over configuration.

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

https://kafka.apache.org/documentation/

Advantages of Using Kafka Connect: 

Below are some benefits of Kafka connect. 

  1. Data-Centric Pipeline
  2. Flexibility and Scalability
  3. Reusability and Extensibility
  4. Distributed and standalone modes
  5. Streaming/batch integration
  6. REST interface for better operations
  7. Automatic offset management

Kafka Connect is a tool suite for scalably and reliably streaming data between Kafka and other external systems such as databases, key-value stores, search indexes, and file systems, using so-called Connectors.

As in the diagram, you can see kafka connect generally contains three main components such as source connector, sink connector, and kafka topic. 

A source connector collects data from a source system. Source systems can be entire databases, tables, or any message brokers. A source connector could also collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.

A sink connector reads data from Kafka topics into other systems, which might be indexed such as Elasticsearch, Hadoop or any other database.

Prerequisite: 

  •  Java8, Maven, basic knowledge of Kafka installed and running on your system.
  • Kafka version used for this guide: kafka_2.10–0.10.2.1.
  1. Download Kafka from apache kafka

Kafka Connect Example:

In this example, we will take a very simple example to demonstrate the power of kafka connect. Basic kafka connector file source connector and file sink connector. 

  1. Start Zookeeper 
  2. Start Kafka 

Create a kafka topic for our testing

$KAFKA_HOME/bin/kafka-topics.sh \
  --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 \
  --partitions 1 \
  --topic my-connect-test

Kafka Connect currently supports two modes of execution: standalone and distributed.

In standalone mode, all work is performed in a single process. This standalone mode is for development.

$ bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]

Source Connector configurations

Provide source configuration file and this should be in your home directory

For the source connector, the reference configuration is below:

name=local-file-source-connector
connector.class=FileStreamSource
tasks.max=1
topic=my-connect-test
file=test-connector.txt
  • name – Unique name for the connector. Attempting to register again with the same name will fail.
  • connector.class – The Java class for the connector
  • tasks.max – The maximum number of tasks that should be created for this connector. The connector may create fewer tasks if it cannot achieve this level of parallelism.
  • key.converter – (optional) Override the default key converter set by the worker.
  • value.converter – (optional) Override the default value converter set by the worker.
  • topics – A comma-separated list of topics to use as input for this connector
  • topics.regex – A Java regular expression of topics to use as input for this connector

Sink Connector Configuration

name=local-file-sink-connector
connector.class=FileStreamSink
tasks.max=1
file=test-sink-conector.txt
topics=my-connect-test

Sink connector, the reference configuration is below:

Now source and sink are ready to let us have worker configuration. Most of the properties are the same because both source and sink are file connectors. 

Worker Configuration

Worker connector, the reference configuration is below:

bootstrap.servers=localhost:9092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=20000
plugin.path=/share/java
  • bootstrap.servers – List of Kafka servers used to bootstrap connections to Kafka
  • offset.flush.interval -Maximum number of milliseconds to wait for records to flush and partition offset data to be committed to offset storage before canceling the process and restoring the offset data to be committed in a future attempt.
  • plugin.path -List of paths separated by commas (,) that contain plugins (connectors, converters, transformations).
  • key.converter – Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the keys in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.
  • value.converter – Converter class used to convert between Kafka Connect format and the serialized form that is written to Kafka. This controls the format of the values in messages written to or read from Kafka, and since this is independent of connectors it allows any connector to work with any serialization format. Examples of common formats include JSON and Avro.

Below config property specific to standalone mode:

  • offset.storage.file.filename – File to store offset data in.

To check more on configuration official document

Now Everything is ready:

  1. kafka/config/connect-standalone.properties 
  2. kafka/config/local-file-source-connector.properties
  3. kafka/config/local-file-sink-connector.properties

the content of test-connector.text

{
		color: "red",
		value: "#f00"
	}
	{
		color: "green",
		value: "#0f0"
	}
	{
		color: "blue",
		value: "#00f"
	}
	{
		color: "cyan",
		value: "#0ff"
	}
	{
		color: "magenta",
		value: "#f0f"
	}
	{
		color: "yellow",
		value: "#ff0"
	}
	{
		color: "black",
		value: "#000"
	}

You can add content later in the same file and check kafka connector able to read and sink data in test-sink-conector.txt

The source connector will automatically detect the changes and publish content over kafka.

make sure to insert a newline at the end, otherwise, the source connector won’t consider the last line.

Now let us run and check the kafka connect 

$KAFKA_HOME/bin/connect-standalone.sh config/connect-standalone.propertiesconfig/local-file-source-connector.propertiesconfig/local-file-sink-connector.properties

This is the simple implementation of kafka connect.

Discover more kafka connector here:- https://www.confluent.io/hub/

Reference documents:-

https://docs.confluent.io/current/connect/quickstart.html

https://kafka.apache.org/documentation/#connect