Commit 8e50fa01 authored by abc's avatar abc

Add second service to call each other

parent e8af48c5
Pipeline #1977 failed with stages
package main
import (
"context"
"demootel/common/otelutil"
"demootel/demo2"
demohttp "demootel/demo2/api/http"
"demootel/demo2/tracing"
"errors"
"fmt"
"go.opentelemetry.io/otel"
"net/http"
"time"
)
//func newExporter(ctx context.Context) (trace.SpanExporter, error) {
// return otlptracehttp.New(ctx)
//}
//func initTracer() (*trace.TracerProvider, error) {
// tp := trace.NewTracerProvider(
// trace.WithSampler(trace.AlwaysSample()), // Adjust sampling strategy as needed
// )
// otel.SetTracerProvider(tp)
// return tp, nil
//}
func main() {
// Set up OpenTelemetry.
otelShutdown, err := otelutil.SetupOTelSDK(context.Background())
if err != nil {
return
}
// Handle shutdown properly so nothing leaks.
defer func() {
err = errors.Join(err, otelShutdown(context.Background()))
}()
//tp, err := initTracer()
//if err != nil {
// panic(err)
//}
//defer func() {
// err := tp.Shutdown(context.Background())
// if err != nil {
// panic(err)
// }
//}()
ts := time.Now().UnixMilli()
fmt.Printf("Ts: %d\n", ts)
svc := newService()
svc = tracing.Demo2TracingMiddleware(svc, otel.Tracer("demo2-vdn"))
errs := make(chan error, 2)
httpPort := "8002"
go startHTTPServer(demohttp.MakeHandler(svc), httpPort, errs)
fmt.Printf("Listening on port: %s", httpPort)
err = <-errs
}
// newService create new instantiate
func newService() demo2.Service {
// Create new pricing service
svc := demo2.NewNoDb()
return svc
}
// Start HTTP Server
func startHTTPServer(handler http.Handler, port string, errs chan error) {
p := fmt.Sprintf(":%s", port)
errs <- http.ListenAndServe(p, handler)
}
...@@ -4,7 +4,10 @@ import ( ...@@ -4,7 +4,10 @@ import (
"context" "context"
"errors" "errors"
"fmt" "fmt"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel" "go.opentelemetry.io/otel"
"io/ioutil"
"net/http"
"time" "time"
) )
...@@ -31,6 +34,39 @@ func (svc demoOtelService) Call(ctx context.Context) (interface{}, error) { ...@@ -31,6 +34,39 @@ func (svc demoOtelService) Call(ctx context.Context) (interface{}, error) {
ctx, span := otel.Tracer("service-demo-call").Start(ctx, "Demo-Call") ctx, span := otel.Tracer("service-demo-call").Start(ctx, "Demo-Call")
fmt.Printf("Call\n") fmt.Printf("Call\n")
// URL of the server endpoint
url := "http://localhost:8002/demo2-otel/call"
// Create a new HTTP client
client := http.Client{Transport: otelhttp.NewTransport(http.DefaultTransport)}
// Create a new HTTP request (POST)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, url, nil) // nil for no body
if err != nil {
panic(err)
}
// Set headers if needed (optional)
// req.Header.Set("Content-Type", "application/json") // Example header
// Send the request and get the response
resp, err := client.Do(req)
if err != nil {
panic(err)
}
defer resp.Body.Close()
// Read the response body (optional)
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
panic(err)
}
// Print the response status code and body (if read)
fmt.Println("Status Code:", resp.StatusCode)
if body != nil {
fmt.Println("Response Body:", string(body))
}
defer span.End() defer span.End()
return "OK", nil return "OK", nil
} }
......
package http
import (
"context"
"demootel/demo2"
"fmt"
"github.com/go-kit/kit/endpoint"
)
func demo2OtelCallEndpoint(svc demo2.Service) endpoint.Endpoint {
return func(ctx context.Context, request interface{}) (interface{}, error) {
fmt.Printf("demo2OtelCallEndpoint\n")
//parentCtx := context.Background() // Assuming context is not already set
// Extract existing trace context from incoming request (if any)
//parentSpan := trace.SpanFromContext(ctx)
//if parentSpan != nil {
// parentCtx = trace.ContextWithSpan(parentCtx, parentSpan)
//}
//ctx, span := otel.Tracer("service").Start(parentCtx, "Call")
//defer span.End()
res, err := svc.Call(ctx)
if err != nil {
return nil, err
}
return res, nil
}
}
package http
import (
"context"
"demootel/common/errors"
"demootel/demo2"
"encoding/json"
"fmt"
kithttp "github.com/go-kit/kit/transport/http"
"github.com/go-zoo/bone"
"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"net/http"
"net/http/pprof"
)
const (
contentType = "application/json"
offset = "offset"
limit = "limit"
name = "name"
metadata = "metadata"
defOffset = 0
defLimit = 100
)
var (
errUnsupportedContentType = errors.New("unsupported content type")
errNotFoundUserId = errors.New("not found user Id")
)
// MakeHandler returns a HTTP handler for API endpoints.
func MakeHandler(svc demo2.Service) *bone.Mux {
//opts := []kithttp.ServerOption{
// kithttp.ServerErrorEncoder(encodeError),
//}
r := bone.New()
r.HandleFunc("/debug/pprof/", pprof.Index)
r.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline)
r.HandleFunc("/debug/pprof/profile", pprof.Profile)
r.HandleFunc("/debug/pprof/symbol", pprof.Symbol)
r.HandleFunc("/debug/pprof/trace", pprof.Trace)
r.Post("/demo2-otel/call", otelhttp.NewHandler(kithttp.NewServer(
demo2OtelCallEndpoint(svc),
decodeIotEventSave,
encodeResponse,
), "demo2-otel/call",
otelhttp.WithMessageEvents(otelhttp.ReadEvents, otelhttp.WriteEvents)))
return r
}
func decodeIotEventSave(_ context.Context, r *http.Request) (interface{}, error) {
//if !strings.Contains(r.Header.Get("Content-Type"), contentType) {
// return nil, errUnsupportedContentType
//}
//req := iotevent.IotEventSaveReq{}
//if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
// return nil, errors.Wrap(iotevent.ErrMalformedEntity, err)
//}
////parse UserId from Header. Override UserId in request body if any
//userId := r.Header.Get("X-User")
//if userId == "" {
// return nil, errNotFoundUserId
//}
//req.UserId = userId
fmt.Printf("decodeIotEventSave\n")
return "Nothing", nil
}
// Encode
func encodeResponse(_ context.Context, w http.ResponseWriter, response interface{}) error {
w.Header().Set("Content-Type", contentType)
err := json.NewEncoder(w).Encode(response)
return err
}
package demo2
import (
"context"
"errors"
"fmt"
"go.opentelemetry.io/otel"
"time"
)
var (
// ErrMalformedEntity indicates malformed entity specification (e.g. invalid username or password).
ErrMalformedEntity = errors.New("malformed entity specification")
ErrConflict = errors.New("conflict")
ErrUnauthorizedAccess = errors.New("ErrUnauthorizedAccess")
ErrForbidden = errors.New("ErrForbidden")
)
var loc, _ = time.LoadLocation("Asia/Ho_Chi_Minh")
type Service interface {
Call(ctx context.Context) (interface{}, error)
}
var _ Service = (*demo2OtelService)(nil)
type demo2OtelService struct {
}
func (svc demo2OtelService) Call(ctx context.Context) (interface{}, error) {
ctx, span := otel.Tracer("service-demo2-call").Start(ctx, "Demo2-Call")
fmt.Printf("Call2\n")
defer span.End()
return "OK", nil
}
func NewNoDb() Service {
return &demo2OtelService{}
}
type Response struct {
Code int `json:"code"`
Msg string `json:"msg"`
Data interface{} `json:"data,omitempty"`
}
package tracing
import (
"context"
"demootel/demo2"
"go.opentelemetry.io/otel/trace"
)
const (
callOp = "call2_op"
)
type demo2TracingMiddleware struct {
svc demo2.Service
tracer trace.Tracer
}
func Demo2TracingMiddleware(svc demo2.Service, tracer trace.Tracer) demo2.Service {
return &demo2TracingMiddleware{svc, tracer}
}
func (m demo2TracingMiddleware) Call(ctx context.Context) (interface{}, error) {
ctx, span := createSpan(ctx, m.tracer, callOp)
defer span.End()
return m.svc.Call(ctx)
}
func createSpan(ctx context.Context, tracer trace.Tracer, opName string) (context.Context, trace.Span) {
return tracer.Start(ctx, opName) //Create nested span (parents-child) automatically if the current context has a span already
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment